# -*- coding: utf-8 -*- import os from unicodedata import category import mysql.connector from mysql.connector import Error # 配置数据库连接 DB_CONFIG = { 'host': '60.204.139.57', 'user': 'root', 'password': '1', 'port': 6003, 'database': 'files' # 替换为数据库名称 } # 初始化数据库 def initialize_files_database(): try: # 连接 MySQL 服务器(不指定数据库) connection = mysql.connector.connect( host=DB_CONFIG['host'], user=DB_CONFIG['user'], password=DB_CONFIG['password'], port=DB_CONFIG['port'] ) if connection.is_connected(): print("成功连接到 MySQL 服务器") cursor = connection.cursor() # 检查数据库是否存在 cursor.execute(f"SHOW DATABASES LIKE '{DB_CONFIG['database']}';") result = cursor.fetchone() if result: print(f"数据库 {DB_CONFIG['database']} 已存在。") else: print(f"数据库 {DB_CONFIG['database']} 不存在,正在创建...") cursor.execute(f"CREATE DATABASE {DB_CONFIG['database']};") print(f"数据库 {DB_CONFIG['database']} 创建成功。") except Error as err: print(f"连接 MySQL 时发生错误:{err}") finally: if connection.is_connected(): cursor.close() connection.close() print("MySQL 服务器连接已关闭") def initialize_files_table(table_name = "category"): try: connection = mysql.connector.connect(**DB_CONFIG) if connection.is_connected(): cursor = connection.cursor() # 动态创建表,指定字符集为 utf8mb4 create_table_query = f''' CREATE TABLE IF NOT EXISTS `{table_name}` ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) UNIQUE, url VARCHAR(255) UNIQUE ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci ''' cursor.execute(create_table_query) print(f"表 {table_name} 初始化成功。") except Error as err: print(f"初始化表时发生错误:{err}") finally: if connection.is_connected(): cursor.close() connection.close() print("MySQL 连接已关闭") # 插入数据 def insert_url(name, url, table_name="category"): try: connection = mysql.connector.connect(**DB_CONFIG) if connection.is_connected(): cursor = connection.cursor() # 确保目标文件表存在 initialize_files_table(table_name) # 插入文件数据到目标表 insert_query = f"INSERT INTO `{table_name}` (name, url) VALUES (%s, %s)" cursor.execute(insert_query, (name, url)) connection.commit() # 判断插入是否成功 if cursor.rowcount > 0: print(f"插入文件数据成功:({name}, {url}) 到表 {table_name}") file_result = {"status": "success"} else: print(f"插入文件数据失败:({name}, {url}) 到表 {table_name}") file_result = {"status": "failed"} # 确保 category 表存在 initialize_files_table("category") # 查询是否已经存在该表名 select_query = "SELECT * FROM `category` WHERE `name` = %s" cursor.execute(select_query, (table_name,)) existing_category = cursor.fetchone() if existing_category: print(f"表名 {table_name} 已经存在,不插入!") category_result = {"status": "exists"} else: # 执行插入 category insert_query = "INSERT INTO `category` (name) VALUES (%s)" cursor.execute(insert_query, (table_name,)) connection.commit() print(f"插入表名成功:({table_name}) 到表 category") category_result = {"status": "success"} return {"status": "success", "file_result": file_result, "category_result": category_result} except mysql.connector.Error as err: if err.errno == 1062: # Duplicate entry error print(f"文件 {name} 已存在,不重复插入。") return {"status": "duplicate", "message": f"文件 {name} 已存在"} print(f"插入数据时发生错误:{err}") return {"status": "error", "message": str(err)} finally: if connection.is_connected(): cursor.close() connection.close() print("MySQL 连接已关闭") def delete_file(table, rowid, file_path='../uploads'): # 检查 table 和 rowid 是否为空 if not table or not rowid: print("表名或名称不能为空!") return try: # 建立数据库连接 connection = mysql.connector.connect(**DB_CONFIG) if connection.is_connected(): cursor = connection.cursor() # 确保 table 仅包含合法的表名(为了安全,避免 SQL 注入) valid_tables = ['image_table', 'another_table'] # 将所有有效表名列出 if table not in valid_tables: print(f"无效的表名:{table}") return # 查询 category 表,检查是否存在该表名 cursor.execute("SELECT * FROM category WHERE name = %s", (table,)) result = cursor.fetchone() if not result: print(f"无效的表名:{table}(没有找到对应的记录)") return # 执行查询数据库记录的操作 cursor.execute(f"SELECT * FROM {table} WHERE id = %s", (rowid,)) result = cursor.fetchone() # 如果查询结果不为空,处理数据 if result: print(f"成功查询到要删除的文件数据") # 将元组转换为字典,假设你的字段是 id, categoryName, url columns = [col[0] for col in cursor.description] # 获取列名 result_dict = dict(zip(columns, result)) # 将结果转化为字典 # 获取文件名 file_name = result_dict.get('name') # 这里假设 'name' 是存储文件名的字段 # 如果获取到文件路径,进行删除操作 if file_name: # 获取当前工作目录 current_path = os.getcwd() # 获取当前目录路径 # 拼接相对路径 file_path = os.path.join(current_path, '..', 'uploads', file_name) if os.path.exists(file_path): try: os.remove(file_path) # 删除文件 print(f"文件 {file_path} 已被删除") except Exception as e: print(f"删除文件时发生错误:{e}") else: print(f"文件路径 {file_path} 无效或文件不存在") # 执行删除数据库记录的操作 cursor.execute(f"DELETE FROM {table} WHERE id = %s", (rowid,)) connection.commit() print(f"成功删除名称为 {rowid} 的记录") except Error as err: print(f"删除数据时发生错误:{err}") finally: # 确保连接关闭 if connection.is_connected(): cursor.close() connection.close() print("MySQL 连接已关闭") # 查询数据 def fetch_data(table_name): try: connection = mysql.connector.connect(**DB_CONFIG) if connection.is_connected(): cursor = connection.cursor() # 查询数据 query = f"SELECT * FROM `{table_name}`" cursor.execute(query) rows = cursor.fetchall() print(f"--- 表 {table_name} 的查询结果 ---") for row in rows: print(row) return rows except Error as err: print(f"查询数据时发生错误:{err}") finally: if connection.is_connected(): cursor.close() connection.close() print("MySQL 连接已关闭") # 主函数 if __name__ == "__main__": # 初始化数据库 initialize_files_database() # 示例:插入和查询数据 table_name = "filePath" insert_url("example_file.mp4", table_name) fetch_data(table_name)