# -*- coding: utf-8 -*- from unicodedata import category import mysql.connector from mysql.connector import Error # 配置数据库连接 DB_CONFIG = { 'host': '60.204.139.57', 'user': 'root', 'password': '1', 'port': 6003, 'database': 'files' # 替换为数据库名称 } # 初始化数据库 def initialize_files_database(): try: # 连接 MySQL 服务器(不指定数据库) connection = mysql.connector.connect( host=DB_CONFIG['host'], user=DB_CONFIG['user'], password=DB_CONFIG['password'], port=DB_CONFIG['port'] ) if connection.is_connected(): print("成功连接到 MySQL 服务器") cursor = connection.cursor() # 检查数据库是否存在 cursor.execute(f"SHOW DATABASES LIKE '{DB_CONFIG['database']}';") result = cursor.fetchone() if result: print(f"数据库 {DB_CONFIG['database']} 已存在。") else: print(f"数据库 {DB_CONFIG['database']} 不存在,正在创建...") cursor.execute(f"CREATE DATABASE {DB_CONFIG['database']};") print(f"数据库 {DB_CONFIG['database']} 创建成功。") except Error as err: print(f"连接 MySQL 时发生错误:{err}") finally: if connection.is_connected(): cursor.close() connection.close() print("MySQL 服务器连接已关闭") def initialize_files_table(table_name = "category"): try: connection = mysql.connector.connect(**DB_CONFIG) if connection.is_connected(): cursor = connection.cursor() # 动态创建表,指定字符集为 utf8mb4 create_table_query = f''' CREATE TABLE IF NOT EXISTS `{table_name}` ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) UNIQUE, url VARCHAR(255) UNIQUE ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci ''' cursor.execute(create_table_query) print(f"表 {table_name} 初始化成功。") except Error as err: print(f"初始化表时发生错误:{err}") finally: if connection.is_connected(): cursor.close() connection.close() print("MySQL 连接已关闭") # 插入数据 def insert_url(name, url, table_name="category"): try: connection = mysql.connector.connect(**DB_CONFIG) if connection.is_connected(): cursor = connection.cursor() # 确保目标文件表存在 initialize_files_table(table_name) # 插入文件数据到目标表 insert_query = f"INSERT INTO `{table_name}` (name, url) VALUES (%s, %s)" cursor.execute(insert_query, (name, url)) connection.commit() # 判断插入是否成功 if cursor.rowcount > 0: print(f"插入文件数据成功:({name}, {url}) 到表 {table_name}") file_result = {"status": "success"} else: print(f"插入文件数据失败:({name}, {url}) 到表 {table_name}") file_result = {"status": "failed"} # 确保 category 表存在 initialize_files_table("category") # 查询是否已经存在该表名 select_query = "SELECT * FROM `category` WHERE `name` = %s" cursor.execute(select_query, (table_name,)) existing_category = cursor.fetchone() if existing_category: print(f"表名 {table_name} 已经存在,不插入!") category_result = {"status": "exists"} else: # 执行插入 category insert_query = "INSERT INTO `category` (name) VALUES (%s)" cursor.execute(insert_query, (table_name,)) connection.commit() print(f"插入表名成功:({table_name}) 到表 category") category_result = {"status": "success"} return {"status": "success", "file_result": file_result, "category_result": category_result} except mysql.connector.Error as err: if err.errno == 1062: # Duplicate entry error print(f"文件 {name} 已存在,不重复插入。") return {"status": "duplicate", "message": f"文件 {name} 已存在"} print(f"插入数据时发生错误:{err}") return {"status": "error", "message": str(err)} finally: if connection.is_connected(): cursor.close() connection.close() print("MySQL 连接已关闭") # 查询数据 def fetch_data(table_name): try: connection = mysql.connector.connect(**DB_CONFIG) if connection.is_connected(): cursor = connection.cursor() # 查询数据 query = f"SELECT * FROM `{table_name}`" cursor.execute(query) rows = cursor.fetchall() print(f"--- 表 {table_name} 的查询结果 ---") for row in rows: print(row) return rows except Error as err: print(f"查询数据时发生错误:{err}") finally: if connection.is_connected(): cursor.close() connection.close() print("MySQL 连接已关闭") # 主函数 if __name__ == "__main__": # 初始化数据库 initialize_files_database() # 示例:插入和查询数据 table_name = "filePath" insert_url("example_file.mp4", table_name) fetch_data(table_name)