| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- # -*- coding: utf-8 -*-
- import os
- from unicodedata import category
- import mysql.connector
- from mysql.connector import Error
- # 配置数据库连接
- DB_CONFIG = {
- 'host': '60.204.139.57',
- 'user': 'root',
- 'password': '1',
- 'port': 6003,
- 'database': 'files' # 替换为数据库名称
- }
- # 初始化数据库
- def initialize_files_database():
- try:
- # 连接 MySQL 服务器(不指定数据库)
- connection = mysql.connector.connect(
- host=DB_CONFIG['host'],
- user=DB_CONFIG['user'],
- password=DB_CONFIG['password'],
- port=DB_CONFIG['port']
- )
- if connection.is_connected():
- print("成功连接到 MySQL 服务器")
- cursor = connection.cursor()
- # 检查数据库是否存在
- cursor.execute(f"SHOW DATABASES LIKE '{DB_CONFIG['database']}';")
- result = cursor.fetchone()
- if result:
- print(f"数据库 {DB_CONFIG['database']} 已存在。")
- else:
- print(f"数据库 {DB_CONFIG['database']} 不存在,正在创建...")
- cursor.execute(f"CREATE DATABASE {DB_CONFIG['database']};")
- print(f"数据库 {DB_CONFIG['database']} 创建成功。")
- except Error as err:
- print(f"连接 MySQL 时发生错误:{err}")
- finally:
- if connection.is_connected():
- cursor.close()
- connection.close()
- print("MySQL 服务器连接已关闭")
- def initialize_files_table(table_name = "category"):
- try:
- connection = mysql.connector.connect(**DB_CONFIG)
- if connection.is_connected():
- cursor = connection.cursor()
- # 动态创建表,指定字符集为 utf8mb4
- create_table_query = f'''
- CREATE TABLE IF NOT EXISTS `{table_name}` (
- id INT AUTO_INCREMENT PRIMARY KEY,
- name VARCHAR(255) UNIQUE,
- url VARCHAR(255) UNIQUE
- ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci
- '''
- cursor.execute(create_table_query)
- print(f"表 {table_name} 初始化成功。")
- except Error as err:
- print(f"初始化表时发生错误:{err}")
- finally:
- if connection.is_connected():
- cursor.close()
- connection.close()
- print("MySQL 连接已关闭")
- # 插入数据
- def insert_url(name, url, table_name="category"):
- try:
- connection = mysql.connector.connect(**DB_CONFIG)
- if connection.is_connected():
- cursor = connection.cursor()
- # 确保目标文件表存在
- initialize_files_table(table_name)
- # 插入文件数据到目标表
- insert_query = f"INSERT INTO `{table_name}` (name, url) VALUES (%s, %s)"
- cursor.execute(insert_query, (name, url))
- connection.commit()
- # 判断插入是否成功
- if cursor.rowcount > 0:
- print(f"插入文件数据成功:({name}, {url}) 到表 {table_name}")
- file_result = {"status": "success"}
- else:
- print(f"插入文件数据失败:({name}, {url}) 到表 {table_name}")
- file_result = {"status": "failed"}
- # 确保 category 表存在
- initialize_files_table("category")
- # 查询是否已经存在该表名
- select_query = "SELECT * FROM `category` WHERE `name` = %s"
- cursor.execute(select_query, (table_name,))
- existing_category = cursor.fetchone()
- if existing_category:
- print(f"表名 {table_name} 已经存在,不插入!")
- category_result = {"status": "exists"}
- else:
- # 执行插入 category
- insert_query = "INSERT INTO `category` (name) VALUES (%s)"
- cursor.execute(insert_query, (table_name,))
- connection.commit()
- print(f"插入表名成功:({table_name}) 到表 category")
- category_result = {"status": "success"}
- return {"status": "success", "file_result": file_result, "category_result": category_result}
- except mysql.connector.Error as err:
- if err.errno == 1062: # Duplicate entry error
- print(f"文件 {name} 已存在,不重复插入。")
- return {"status": "duplicate", "message": f"文件 {name} 已存在"}
- print(f"插入数据时发生错误:{err}")
- return {"status": "error", "message": str(err)}
- finally:
- if connection.is_connected():
- cursor.close()
- connection.close()
- print("MySQL 连接已关闭")
- def delete_file(table, rowid, file_path='../uploads'):
- # 检查 table 和 rowid 是否为空
- if not table or not rowid:
- print("表名或名称不能为空!")
- return
- try:
- # 建立数据库连接
- connection = mysql.connector.connect(**DB_CONFIG)
- if connection.is_connected():
- cursor = connection.cursor()
- # 确保 table 仅包含合法的表名(为了安全,避免 SQL 注入)
- valid_tables = ['image_table', 'another_table'] # 将所有有效表名列出
- if table not in valid_tables:
- print(f"无效的表名:{table}")
- return
- # 查询 category 表,检查是否存在该表名
- cursor.execute("SELECT * FROM category WHERE name = %s", (table,))
- result = cursor.fetchone()
- if not result:
- print(f"无效的表名:{table}(没有找到对应的记录)")
- return
- # 执行查询数据库记录的操作
- cursor.execute(f"SELECT * FROM {table} WHERE id = %s", (rowid,))
- result = cursor.fetchone()
- # 如果查询结果不为空,处理数据
- if result:
- print(f"成功查询到要删除的文件数据")
- # 将元组转换为字典,假设你的字段是 id, categoryName, url
- columns = [col[0] for col in cursor.description] # 获取列名
- result_dict = dict(zip(columns, result)) # 将结果转化为字典
- # 获取文件名
- file_name = result_dict.get('name') # 这里假设 'name' 是存储文件名的字段
- # 如果获取到文件路径,进行删除操作
- if file_name:
- # 获取当前工作目录
- current_path = os.getcwd() # 获取当前目录路径
- # 拼接相对路径
- file_path = os.path.join(current_path, '..', 'uploads', file_name)
- if os.path.exists(file_path):
- try:
- os.remove(file_path) # 删除文件
- print(f"文件 {file_path} 已被删除")
- except Exception as e:
- print(f"删除文件时发生错误:{e}")
- else:
- print(f"文件路径 {file_path} 无效或文件不存在")
- # 执行删除数据库记录的操作
- cursor.execute(f"DELETE FROM {table} WHERE id = %s", (rowid,))
- connection.commit()
- print(f"成功删除名称为 {rowid} 的记录")
- except Error as err:
- print(f"删除数据时发生错误:{err}")
- finally:
- # 确保连接关闭
- if connection.is_connected():
- cursor.close()
- connection.close()
- print("MySQL 连接已关闭")
- # 查询数据
- def fetch_data(table_name):
- try:
- connection = mysql.connector.connect(**DB_CONFIG)
- if connection.is_connected():
- cursor = connection.cursor()
- # 查询数据
- query = f"SELECT * FROM `{table_name}`"
- cursor.execute(query)
- rows = cursor.fetchall()
- print(f"--- 表 {table_name} 的查询结果 ---")
- for row in rows:
- print(row)
- return rows
- except Error as err:
- print(f"查询数据时发生错误:{err}")
- finally:
- if connection.is_connected():
- cursor.close()
- connection.close()
- print("MySQL 连接已关闭")
- # 主函数
- if __name__ == "__main__":
- # 初始化数据库
- initialize_files_database()
- # 示例:插入和查询数据
- table_name = "filePath"
- insert_url("example_file.mp4", table_name)
- fetch_data(table_name)
|