#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 现代化Web SQLite数据库查看器 基于Flask框架,提供自然的内容显示和筛选功能 """ from flask import Flask, render_template, jsonify, request, send_from_directory import sqlite3 import os import json from datetime import datetime from loguru import logger import threading import time import requests app = Flask(__name__) # 数据库路径 DB_PATH = os.path.join(os.path.dirname(__file__), 'products.db') # 任务状态存储 analysis_tasks = {} class SQLiteWebViewer: def __init__(self, db_path): self.db_path = db_path logger.info(f"初始化Web SQLite查看器,数据库路径: {db_path}") def get_tables(self): """获取所有表名""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'") tables = [row[0] for row in cursor.fetchall()] conn.close() logger.info(f"获取到 {len(tables)} 个表") return tables except Exception as e: logger.error(f"获取表列表失败: {e}") return [] def get_table_structure(self, table_name): """获取表结构""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(f"PRAGMA table_info({table_name})") columns = cursor.fetchall() conn.close() structure = [] for col in columns: structure.append({ 'cid': col[0], 'name': col[1], 'type': col[2], 'notnull': col[3], 'default': col[4], 'pk': col[5] }) logger.info(f"获取表 {table_name} 结构,共 {len(structure)} 个字段") return structure except Exception as e: logger.error(f"获取表结构失败: {e}") return [] def get_table_data(self, table_name, page=1, per_page=50, search_field=None, search_value=None): """获取表数据,支持分页和搜索""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 获取字段类型信息 field_types = {} text_fields = [] cursor.execute(f"PRAGMA table_info({table_name});") columns_info = cursor.fetchall() for col_info in columns_info: field_name = col_info[1] field_type = col_info[2] field_types[field_name] = field_type # 字段名 -> 字段类型 # 收集文本类型字段 if field_type.upper() not in ['INTEGER', 'REAL', 'FLOAT', 'NUMERIC']: text_fields.append(field_name) # 解析搜索条件 query_params = [] where_clause = "" if search_value: # 获取要搜索的字段列表 search_fields = [] if isinstance(search_field, list): # 如果是列表,使用所有提供的字段 search_fields = [f for f in search_field if f in field_types] elif search_field == "all": # 如果是"all",使用所有文本字段 search_fields = text_fields elif search_field and search_field in field_types: # 如果是单个字段,直接使用 search_fields = [search_field] if search_fields: conditions = [] for field in search_fields: # 检查是否为数值比较操作符 import re numeric_op_pattern = re.compile(r'^(<=?|>=?|=)(\d+(\.\d+)?)$') match = numeric_op_pattern.match(search_value) # 检查字段是否为数值类型 is_numeric_field = field_types.get(field, '').upper() in ['INTEGER', 'REAL', 'FLOAT', 'NUMERIC'] if match and is_numeric_field: # 数值比较操作 operator = match.group(1) value = match.group(2) conditions.append(f"{field} {operator} ?") query_params.append(float(value) if '.' in value else int(value)) logger.info(f"应用数值比较筛选: {field} {operator} {value}") else: # 默认文本模糊匹配 conditions.append(f"{field} LIKE ?") query_params.append(f'%{search_value}%') logger.info(f"应用文本模糊匹配: {field} LIKE '%{search_value}%'") if conditions: where_clause = " WHERE " + " OR ".join(conditions) # 获取总记录数 count_query = f"SELECT COUNT(*) FROM {table_name}{where_clause}" cursor.execute(count_query, query_params) total_count = cursor.fetchone()[0] # 检查是否有日期相关字段,用于排序 date_columns = [] # 常见的日期字段名称 date_field_names = ['created_at', 'updated_at', 'date', 'publish_date', 'release_date'] for col_info in columns_info: field_name = col_info[1] # 检查字段名是否包含日期相关关键词 if any(keyword in field_name.lower() for keyword in date_field_names): date_columns.append(field_name) # 如果找到日期字段,按最新日期排序 order_by_clause = "" if date_columns: # 优先使用updated_at,如果没有则使用created_at,否则使用第一个找到的日期字段 if 'updated_at' in date_columns: sort_column = 'updated_at' elif 'created_at' in date_columns: sort_column = 'created_at' else: sort_column = date_columns[0] order_by_clause = f" ORDER BY {sort_column} DESC" logger.info(f"应用日期排序: {sort_column} DESC") # 获取分页数据 offset = (page - 1) * per_page query_params.extend([per_page, offset]) data_query = f"SELECT * FROM {table_name}{where_clause}{order_by_clause} LIMIT ? OFFSET ?" cursor.execute(data_query, query_params) rows = cursor.fetchall() # 获取列名 cursor.execute(f"PRAGMA table_info({table_name})") columns = [col[1] for col in cursor.fetchall()] conn.close() # 处理数据,检测多行文本 processed_rows = [] for row in rows: processed_row = [] for i, cell in enumerate(row): col_name = columns[i] # 处理difficulty_score字段的缺失值 if col_name == "difficulty_score" and cell is None: processed_row.append({'value': '未评分', 'type': 'empty'}) elif cell is None: processed_row.append({'value': '', 'type': 'empty'}) elif isinstance(cell, str) and ('\n' in cell or len(cell) > 100): # 多行文本或长文本 lines = cell.count('\n') + 1 processed_row.append({ 'value': cell, 'type': 'multiline', 'lines': lines, 'length': len(cell) }) else: processed_row.append({'value': str(cell), 'type': 'normal'}) processed_rows.append(processed_row) logger.info(f"获取表 {table_name} 数据,第 {page} 页,共 {len(processed_rows)} 条记录") return { 'columns': columns, 'rows': processed_rows, 'total_count': total_count, 'page': page, 'per_page': per_page, 'total_pages': (total_count + per_page - 1) // per_page } except Exception as e: logger.error(f"获取表数据失败: {e}") return {'columns': [], 'rows': [], 'total_count': 0, 'page': 1, 'per_page': 50, 'total_pages': 0} # 初始化查看器 viewer = SQLiteWebViewer(DB_PATH) @app.route('/') def index(): """主页""" logger.info("访问主页") return render_template('index.html') @app.route('/api/tables') def get_tables(): """获取所有表""" tables = viewer.get_tables() return jsonify({'tables': tables}) @app.route('/api/table//structure') def get_table_structure(table_name): """获取表结构""" structure = viewer.get_table_structure(table_name) return jsonify({'structure': structure}) @app.route('/api/table//data') def get_table_data(table_name): """获取表数据""" page = int(request.args.get('page', 1)) per_page = int(request.args.get('per_page', 50)) # 获取所有search_field参数(可能是多个) search_field = request.args.getlist('search_field') # 如果只有一个且为空,使用单个值 if len(search_field) == 1: search_field = search_field[0] search_value = request.args.get('search_value') data = viewer.get_table_data(table_name, page, per_page, search_field, search_value) return jsonify(data) @app.route('/api/analyze_missing_scores') def analyze_missing_scores(): """触发缺失分数分析任务""" task_id = str(int(time.time())) analysis_tasks[task_id] = { 'status': 'running', 'progress': 0, 'total': 0, 'completed': 0, 'error': None, 'start_time': datetime.now().isoformat() } # 在后台线程中执行分析 def run_analysis(): try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # 检查product_analysis表是否存在difficulty_score字段 cursor.execute("PRAGMA table_info(product_analysis)") columns = [col[1] for col in cursor.fetchall()] if 'difficulty_score' not in columns: # 如果不存在,添加该字段 cursor.execute("ALTER TABLE product_analysis ADD COLUMN difficulty_score REAL") conn.commit() logger.info("添加了difficulty_score字段") # 查询缺失分数的产品 cursor.execute(""" SELECT pa.id, pa.original_name, pa.product_intro FROM product_analysis pa WHERE pa.difficulty_score IS NULL OR pa.difficulty_score = '' """) missing_scores = cursor.fetchall() total = len(missing_scores) analysis_tasks[task_id]['total'] = total logger.info(f"找到 {total} 个缺失分数的产品") for i, (analysis_id, product_name, introduction) in enumerate(missing_scores): try: # 调用Ollama API分析难度分数 score = analyze_product_difficulty(product_name, introduction) # 更新数据库 cursor.execute( "UPDATE product_analysis SET difficulty_score = ? WHERE id = ?", (score, analysis_id) ) conn.commit() analysis_tasks[task_id]['completed'] = i + 1 analysis_tasks[task_id]['progress'] = int((i + 1) / total * 100) logger.info(f"已分析产品 {i+1}/{total}: {product_name}, 分数: {score}") # 避免频繁调用API time.sleep(1) except Exception as e: logger.error(f"分析产品 {product_name} 失败: {e}") # 继续处理下一个产品 continue analysis_tasks[task_id]['status'] = 'completed' logger.info("所有缺失分数分析完成") except Exception as e: logger.error(f"分析任务失败: {e}") analysis_tasks[task_id]['status'] = 'failed' analysis_tasks[task_id]['error'] = str(e) finally: conn.close() analysis_tasks[task_id]['end_time'] = datetime.now().isoformat() threading.Thread(target=run_analysis).start() return jsonify({ 'task_id': task_id, 'status': 'started', 'message': '分析任务已启动,请通过task_id查询进度' }) @app.route('/api/update_task_status/') def update_task_status(task_id): """获取任务状态""" if task_id in analysis_tasks: return jsonify(analysis_tasks[task_id]) else: return jsonify({ 'error': 'Task not found', 'message': '找不到指定的任务ID' }), 404 def analyze_product_difficulty(product_name, introduction): """调用Ollama API分析产品难度分数""" try: # 构建提示词 prompt = f"""请基于以下产品信息,分析其技术实现的难度分数(1-100分): 产品名称:{product_name} 产品简介:{introduction} 请只返回一个整数分数,不需要其他解释。分数越高表示技术实现难度越大。 """ # 调用Ollama API response = requests.post( 'http://localhost:11434/api/generate', json={ 'model': 'llama3', 'prompt': prompt, 'format': 'json', 'stream': False }, timeout=30 ) if response.status_code == 200: data = response.json() # 提取分数 score_text = data.get('response', '50').strip() # 尝试从文本中提取数字 import re numbers = re.findall(r'\d+', score_text) if numbers: score = int(numbers[0]) # 确保分数在1-100范围内 return max(1, min(100, score)) # 如果API调用失败或无法提取分数,返回默认值50 logger.warning(f"无法从Ollama API获取有效分数,返回默认值") return 50 except Exception as e: logger.error(f"调用Ollama API失败: {e}") # 出错时返回默认值 return 50 @app.route('/static/') def static_files(filename): """静态文件""" return send_from_directory('static', filename) def create_html_template(): """创建HTML模板""" template_dir = 'templates' if not os.path.exists(template_dir): os.makedirs(template_dir) html_content = ''' SQLite数据库查看器

🗄️ SQLite数据库查看器

请选择数据表

记录数: 0 第 0 页,共 0 页
请选择数据表以查看内容

📊 分数分析进度

0%

等待分析开始...

''' with open(os.path.join(template_dir, 'index.html'), 'w', encoding='utf-8') as f: f.write(html_content) logger.info("HTML模板创建完成") if __name__ == '__main__': logger.info("启动Web SQLite查看器") # 创建HTML模板 create_html_template() # 检查数据库文件 if not os.path.exists(DB_PATH): logger.warning(f"数据库文件不存在: {DB_PATH}") logger.info("将创建一个空的数据库用于演示") # 创建示例数据库 conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # 创建示例表 cursor.execute(''' CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, description TEXT, features TEXT, price REAL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 创建product_analysis表 cursor.execute(''' CREATE TABLE IF NOT EXISTS product_analysis ( id INTEGER PRIMARY KEY AUTOINCREMENT, product_id INTEGER, analysis TEXT, difficulty_score REAL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (product_id) REFERENCES products(id) ) ''') # 插入示例数据 sample_data = [ ('产品A', '这是一个非常优秀的产品\n具有多种实用功能\n用户反馈很好', '高性能\n易用性\n稳定性', 99.99), ('产品B', '创新设计\n简洁界面\n强大功能', '创新\n美观\n实用', 149.99), ('产品C', '专业级解决方案\n适用于企业环境\n支持大规模部署', '企业级\n可扩展\n安全', 299.99) ] cursor.executemany('INSERT INTO products (name, description, features, price) VALUES (?, ?, ?, ?)', sample_data) # 插入一些分析数据 cursor.execute('INSERT INTO product_analysis (product_id, analysis, difficulty_score) VALUES (1, "产品分析示例", 75)') cursor.execute('INSERT INTO product_analysis (product_id, analysis) VALUES (2, "产品分析示例,无分数")') conn.commit() conn.close() logger.info("示例数据库创建完成") logger.info(f"Web服务器启动,访问地址: http://localhost:5000") logger.info("按 Ctrl+C 停止服务器") app.run(debug=True, host='0.0.0.0', port=5000)