Files
tophux_scrape/product/web_sqlite_viewer.py

1138 lines
41 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
现代化Web SQLite数据库查看器
基于Flask框架提供自然的内容显示和筛选功能
"""
from flask import Flask, render_template, jsonify, request, send_from_directory
import sqlite3
import os
import json
from datetime import datetime
from loguru import logger
import threading
import time
import requests
app = Flask(__name__)
# 数据库路径
DB_PATH = os.path.join(os.path.dirname(__file__), 'products.db')
# 任务状态存储
analysis_tasks = {}
class SQLiteWebViewer:
def __init__(self, db_path):
self.db_path = db_path
logger.info(f"初始化Web SQLite查看器数据库路径: {db_path}")
def get_tables(self):
"""获取所有表名"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
tables = [row[0] for row in cursor.fetchall()]
conn.close()
logger.info(f"获取到 {len(tables)} 个表")
return tables
except Exception as e:
logger.error(f"获取表列表失败: {e}")
return []
def get_table_structure(self, table_name):
"""获取表结构"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
conn.close()
structure = []
for col in columns:
structure.append({
'cid': col[0],
'name': col[1],
'type': col[2],
'notnull': col[3],
'default': col[4],
'pk': col[5]
})
logger.info(f"获取表 {table_name} 结构,共 {len(structure)} 个字段")
return structure
except Exception as e:
logger.error(f"获取表结构失败: {e}")
return []
def get_table_data(self, table_name, page=1, per_page=50, search_field=None, search_value=None):
"""获取表数据,支持分页和搜索"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 获取字段类型信息
field_types = {}
text_fields = []
cursor.execute(f"PRAGMA table_info({table_name});")
columns_info = cursor.fetchall()
for col_info in columns_info:
field_name = col_info[1]
field_type = col_info[2]
field_types[field_name] = field_type # 字段名 -> 字段类型
# 收集文本类型字段
if field_type.upper() not in ['INTEGER', 'REAL', 'FLOAT', 'NUMERIC']:
text_fields.append(field_name)
# 解析搜索条件
query_params = []
where_clause = ""
if search_value:
# 获取要搜索的字段列表
search_fields = []
if isinstance(search_field, list):
# 如果是列表,使用所有提供的字段
search_fields = [f for f in search_field if f in field_types]
elif search_field == "all":
# 如果是"all",使用所有文本字段
search_fields = text_fields
elif search_field and search_field in field_types:
# 如果是单个字段,直接使用
search_fields = [search_field]
if search_fields:
conditions = []
for field in search_fields:
# 检查是否为数值比较操作符
import re
numeric_op_pattern = re.compile(r'^(<=?|>=?|=)(\d+(\.\d+)?)$')
match = numeric_op_pattern.match(search_value)
# 检查字段是否为数值类型
is_numeric_field = field_types.get(field, '').upper() in ['INTEGER', 'REAL', 'FLOAT', 'NUMERIC']
if match and is_numeric_field:
# 数值比较操作
operator = match.group(1)
value = match.group(2)
conditions.append(f"{field} {operator} ?")
query_params.append(float(value) if '.' in value else int(value))
logger.info(f"应用数值比较筛选: {field} {operator} {value}")
else:
# 默认文本模糊匹配
conditions.append(f"{field} LIKE ?")
query_params.append(f'%{search_value}%')
logger.info(f"应用文本模糊匹配: {field} LIKE '%{search_value}%'")
if conditions:
where_clause = " WHERE " + " OR ".join(conditions)
# 获取总记录数
count_query = f"SELECT COUNT(*) FROM {table_name}{where_clause}"
cursor.execute(count_query, query_params)
total_count = cursor.fetchone()[0]
# 检查是否有日期相关字段,用于排序
date_columns = []
# 常见的日期字段名称
date_field_names = ['created_at', 'updated_at', 'date', 'publish_date', 'release_date']
for col_info in columns_info:
field_name = col_info[1]
# 检查字段名是否包含日期相关关键词
if any(keyword in field_name.lower() for keyword in date_field_names):
date_columns.append(field_name)
# 如果找到日期字段,按最新日期排序
order_by_clause = ""
if date_columns:
# 优先使用updated_at如果没有则使用created_at否则使用第一个找到的日期字段
if 'updated_at' in date_columns:
sort_column = 'updated_at'
elif 'created_at' in date_columns:
sort_column = 'created_at'
else:
sort_column = date_columns[0]
order_by_clause = f" ORDER BY {sort_column} DESC"
logger.info(f"应用日期排序: {sort_column} DESC")
# 获取分页数据
offset = (page - 1) * per_page
query_params.extend([per_page, offset])
data_query = f"SELECT * FROM {table_name}{where_clause}{order_by_clause} LIMIT ? OFFSET ?"
cursor.execute(data_query, query_params)
rows = cursor.fetchall()
# 获取列名
cursor.execute(f"PRAGMA table_info({table_name})")
columns = [col[1] for col in cursor.fetchall()]
conn.close()
# 处理数据,检测多行文本
processed_rows = []
for row in rows:
processed_row = []
for i, cell in enumerate(row):
col_name = columns[i]
# 处理difficulty_score字段的缺失值
if col_name == "difficulty_score" and cell is None:
processed_row.append({'value': '未评分', 'type': 'empty'})
elif cell is None:
processed_row.append({'value': '', 'type': 'empty'})
elif isinstance(cell, str) and ('\n' in cell or len(cell) > 100):
# 多行文本或长文本
lines = cell.count('\n') + 1
processed_row.append({
'value': cell,
'type': 'multiline',
'lines': lines,
'length': len(cell)
})
else:
processed_row.append({'value': str(cell), 'type': 'normal'})
processed_rows.append(processed_row)
logger.info(f"获取表 {table_name} 数据,第 {page} 页,共 {len(processed_rows)} 条记录")
return {
'columns': columns,
'rows': processed_rows,
'total_count': total_count,
'page': page,
'per_page': per_page,
'total_pages': (total_count + per_page - 1) // per_page
}
except Exception as e:
logger.error(f"获取表数据失败: {e}")
return {'columns': [], 'rows': [], 'total_count': 0, 'page': 1, 'per_page': 50, 'total_pages': 0}
# 初始化查看器
viewer = SQLiteWebViewer(DB_PATH)
@app.route('/')
def index():
"""主页"""
logger.info("访问主页")
return render_template('index.html')
@app.route('/api/tables')
def get_tables():
"""获取所有表"""
tables = viewer.get_tables()
return jsonify({'tables': tables})
@app.route('/api/table/<table_name>/structure')
def get_table_structure(table_name):
"""获取表结构"""
structure = viewer.get_table_structure(table_name)
return jsonify({'structure': structure})
@app.route('/api/table/<table_name>/data')
def get_table_data(table_name):
"""获取表数据"""
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 50))
# 获取所有search_field参数可能是多个
search_field = request.args.getlist('search_field')
# 如果只有一个且为空,使用单个值
if len(search_field) == 1:
search_field = search_field[0]
search_value = request.args.get('search_value')
data = viewer.get_table_data(table_name, page, per_page, search_field, search_value)
return jsonify(data)
@app.route('/api/analyze_missing_scores')
def analyze_missing_scores():
"""触发缺失分数分析任务"""
task_id = str(int(time.time()))
analysis_tasks[task_id] = {
'status': 'running',
'progress': 0,
'total': 0,
'completed': 0,
'error': None,
'start_time': datetime.now().isoformat()
}
# 在后台线程中执行分析
def run_analysis():
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 检查product_analysis表是否存在difficulty_score字段
cursor.execute("PRAGMA table_info(product_analysis)")
columns = [col[1] for col in cursor.fetchall()]
if 'difficulty_score' not in columns:
# 如果不存在,添加该字段
cursor.execute("ALTER TABLE product_analysis ADD COLUMN difficulty_score REAL")
conn.commit()
logger.info("添加了difficulty_score字段")
# 查询缺失分数的产品
cursor.execute("""
SELECT pa.id, pa.original_name, pa.product_intro
FROM product_analysis pa
WHERE pa.difficulty_score IS NULL OR pa.difficulty_score = ''
""")
missing_scores = cursor.fetchall()
total = len(missing_scores)
analysis_tasks[task_id]['total'] = total
logger.info(f"找到 {total} 个缺失分数的产品")
for i, (analysis_id, product_name, introduction) in enumerate(missing_scores):
try:
# 调用Ollama API分析难度分数
score = analyze_product_difficulty(product_name, introduction)
# 更新数据库
cursor.execute(
"UPDATE product_analysis SET difficulty_score = ? WHERE id = ?",
(score, analysis_id)
)
conn.commit()
analysis_tasks[task_id]['completed'] = i + 1
analysis_tasks[task_id]['progress'] = int((i + 1) / total * 100)
logger.info(f"已分析产品 {i+1}/{total}: {product_name}, 分数: {score}")
# 避免频繁调用API
time.sleep(1)
except Exception as e:
logger.error(f"分析产品 {product_name} 失败: {e}")
# 继续处理下一个产品
continue
analysis_tasks[task_id]['status'] = 'completed'
logger.info("所有缺失分数分析完成")
except Exception as e:
logger.error(f"分析任务失败: {e}")
analysis_tasks[task_id]['status'] = 'failed'
analysis_tasks[task_id]['error'] = str(e)
finally:
conn.close()
analysis_tasks[task_id]['end_time'] = datetime.now().isoformat()
threading.Thread(target=run_analysis).start()
return jsonify({
'task_id': task_id,
'status': 'started',
'message': '分析任务已启动请通过task_id查询进度'
})
@app.route('/api/update_task_status/<task_id>')
def update_task_status(task_id):
"""获取任务状态"""
if task_id in analysis_tasks:
return jsonify(analysis_tasks[task_id])
else:
return jsonify({
'error': 'Task not found',
'message': '找不到指定的任务ID'
}), 404
def analyze_product_difficulty(product_name, introduction):
"""调用Ollama API分析产品难度分数"""
try:
# 构建提示词
prompt = f"""请基于以下产品信息分析其技术实现的难度分数1-100分
产品名称:{product_name}
产品简介:{introduction}
请只返回一个整数分数,不需要其他解释。分数越高表示技术实现难度越大。
"""
# 调用Ollama API
response = requests.post(
'http://localhost:11434/api/generate',
json={
'model': 'llama3',
'prompt': prompt,
'format': 'json',
'stream': False
},
timeout=30
)
if response.status_code == 200:
data = response.json()
# 提取分数
score_text = data.get('response', '50').strip()
# 尝试从文本中提取数字
import re
numbers = re.findall(r'\d+', score_text)
if numbers:
score = int(numbers[0])
# 确保分数在1-100范围内
return max(1, min(100, score))
# 如果API调用失败或无法提取分数返回默认值50
logger.warning(f"无法从Ollama API获取有效分数返回默认值")
return 50
except Exception as e:
logger.error(f"调用Ollama API失败: {e}")
# 出错时返回默认值
return 50
@app.route('/static/<path:filename>')
def static_files(filename):
"""静态文件"""
return send_from_directory('static', filename)
def create_html_template():
"""创建HTML模板"""
template_dir = 'templates'
if not os.path.exists(template_dir):
os.makedirs(template_dir)
html_content = '''<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>SQLite数据库查看器</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
color: #333;
}
.container {
width: 100%;
margin: 0 auto;
padding: 20px;
}
.header {
background: rgba(255, 255, 255, 0.95);
backdrop-filter: blur(10px);
border-radius: 15px;
padding: 25px;
margin-bottom: 25px;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1);
}
.header h1 {
color: #2c3e50;
font-size: 2.5em;
margin-bottom: 10px;
text-align: center;
}
.controls {
display: flex;
gap: 20px;
align-items: center;
flex-wrap: wrap;
margin-top: 20px;
}
.control-group {
display: flex;
flex-direction: column;
gap: 8px;
}
.control-group label {
font-weight: 600;
color: #34495e;
font-size: 0.9em;
}
select, input {
padding: 12px 15px;
border: 2px solid #e0e6ed;
border-radius: 8px;
font-size: 14px;
transition: all 0.3s ease;
background: white;
}
select:focus, input:focus {
outline: none;
border-color: #667eea;
box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1);
}
.btn {
padding: 12px 24px;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border: none;
border-radius: 8px;
cursor: pointer;
font-weight: 600;
transition: all 0.3s ease;
text-decoration: none;
display: inline-block;
}
.btn:hover {
transform: translateY(-2px);
box-shadow: 0 5px 15px rgba(0, 0, 0, 0.2);
}
.data-container {
background: rgba(255, 255, 255, 0.95);
backdrop-filter: blur(10px);
border-radius: 15px;
padding: 25px;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1);
}
.table-info {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 20px;
padding: 15px;
background: #f8f9fa;
border-radius: 10px;
}
.table-info h2 {
color: #2c3e50;
font-size: 1.5em;
}
.stats {
display: flex;
gap: 20px;
font-size: 0.9em;
color: #7f8c8d;
}
.table-wrapper {
overflow-x: auto;
border-radius: 10px;
box-shadow: 0 4px 16px rgba(0, 0, 0, 0.1);
}
table {
width: 100%;
border-collapse: collapse;
background: white;
font-size: 14px;
}
th {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 15px 12px;
text-align: left;
font-weight: 600;
position: sticky;
top: 0;
z-index: 10;
}
td {
padding: 12px;
border-bottom: 1px solid #ecf0f1;
vertical-align: top;
}
tr:nth-child(even) {
background-color: #f8f9fa;
}
tr:hover {
background-color: #e3f2fd;
transition: background-color 0.3s ease;
}
.multiline-cell {
white-space: pre-wrap;
line-height: 1.6;
max-height: 200px;
overflow-y: auto;
padding: 8px;
background: #fff3cd;
border-radius: 6px;
border-left: 4px solid #ffc107;
}
.normal-cell {
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
max-width: 300px;
}
.empty-cell {
color: #95a5a6;
font-style: italic;
}
.pagination {
display: flex;
justify-content: center;
align-items: center;
gap: 10px;
margin-top: 25px;
flex-wrap: wrap;
}
.page-info {
color: #7f8c8d;
font-weight: 600;
}
.page-btn {
padding: 8px 12px;
border: 2px solid #e0e6ed;
background: white;
border-radius: 6px;
cursor: pointer;
transition: all 0.3s ease;
min-width: 40px;
text-align: center;
}
.page-btn:hover {
border-color: #667eea;
background: #667eea;
color: white;
}
.page-btn.active {
background: #667eea;
color: white;
border-color: #667eea;
}
.page-btn:disabled {
opacity: 0.5;
cursor: not-allowed;
}
.loading {
text-align: center;
padding: 40px;
color: #7f8c8d;
font-size: 1.1em;
}
.error {
background: #f8d7da;
color: #721c24;
padding: 15px;
border-radius: 8px;
border: 1px solid #f5c6cb;
margin: 20px 0;
}
.no-data {
text-align: center;
padding: 40px;
color: #7f8c8d;
font-size: 1.1em;
}
.analyze-btn {
background: linear-gradient(135deg, #28a745, #20c997);
color: white;
border: none;
padding: 8px 16px;
border-radius: 6px;
font-size: 1em;
cursor: pointer;
transition: all 0.3s ease;
}
.analyze-btn:hover {
transform: translateY(-1px);
box-shadow: 0 4px 8px rgba(40, 167, 69, 0.3);
}
.analyze-btn:disabled {
background: #6c757d;
cursor: not-allowed;
transform: none;
box-shadow: none;
}
.progress-container {
margin-top: 20px;
padding: 15px;
background: rgba(255, 255, 255, 0.95);
backdrop-filter: blur(10px);
border-radius: 12px;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1);
display: none;
}
.progress-bar {
width: 100%;
height: 20px;
background: #e9ecef;
border-radius: 10px;
overflow: hidden;
margin: 10px 0;
}
.progress-fill {
height: 100%;
background: linear-gradient(90deg, #667eea, #764ba2);
transition: width 0.3s ease;
display: flex;
align-items: center;
justify-content: center;
color: white;
font-size: 0.8em;
font-weight: 600;
}
@media (max-width: 768px) {
.controls {
flex-direction: column;
align-items: stretch;
}
.table-info {
flex-direction: column;
gap: 15px;
text-align: center;
}
.stats {
justify-content: center;
}
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🗄️ SQLite数据库查看器</h1>
<div class="controls">
<div class="control-group">
<label for="tableSelect">选择数据表:</label>
<select id="tableSelect">
<option value="">加载中...</option>
</select>
</div>
<div class="control-group">
<label for="analyzeBtn">分析:</label>
<button id="analyzeScoresBtn" class="analyze-btn">📊 分析缺失分数</button>
</div>
<div class="control-group">
<label for="searchField">筛选字段:</label>
<select id="searchField" multiple disabled style="min-height: 80px;">
<option value="">所有文本字段</option>
</select>
</div>
<div class="control-group">
<label for="searchValue">筛选内容:</label>
<input type="text" id="searchValue" placeholder="输入筛选内容..." disabled>
</div>
<button class="btn" onclick="loadData()">刷新数据</button>
</div>
</div>
</div>
<div class="data-container">
<div class="table-info">
<h2 id="tableName">请选择数据表</h2>
<div class="stats">
<span id="recordCount">记录数: 0</span>
<span id="pageInfo">第 0 页,共 0 页</span>
</div>
</div>
<div id="dataContainer">
<div class="no-data">请选择数据表以查看内容</div>
</div>
<div id="pagination" class="pagination" style="display: none;">
<button class="page-btn" onclick="changePage('prev')" id="prevBtn">上一页</button>
<span class="page-info" id="pageInfoDetail"></span>
<button class="page-btn" onclick="changePage('next')" id="nextBtn">下一页</button>
</div>
</div>
</div>
<div id="progressSection" class="progress-container">
<h3>📊 分数分析进度</h3>
<div class="progress-bar">
<div id="progressFill" class="progress-fill" style="width: 0%;">0%</div>
</div>
<p id="progressText">等待分析开始...</p>
</div>
</div>
<script>
let currentTable = '';
let currentPage = 1;
let perPage = 50;
let totalPages = 1;
let currentData = null;
// 绑定事件
document.addEventListener('DOMContentLoaded', function() {
loadTables();
// 绑定事件
document.getElementById('tableSelect').addEventListener('change', function() {
currentTable = this.value;
currentPage = 1;
if (currentTable) {
loadTableStructure();
loadData();
}
});
// 分析缺失分数按钮事件
document.getElementById('analyzeScoresBtn').addEventListener('click', analyzeMissingScores);
});
// 分析缺失分数
async function analyzeMissingScores() {
const analyzeBtn = document.getElementById('analyzeScoresBtn');
const progressSection = document.getElementById('progressSection');
const progressFill = document.getElementById('progressFill');
const progressText = document.getElementById('progressText');
try {
// 禁用按钮
analyzeBtn.disabled = true;
analyzeBtn.textContent = '分析中...';
// 显示进度条
progressSection.style.display = 'block';
progressFill.style.width = '0%';
progressFill.textContent = '0%';
progressText.textContent = '正在启动分析任务...';
// 启动分析任务
const response = await fetch('/api/analyze_missing_scores');
const data = await response.json();
if (data.task_id) {
// 定期查询任务状态
const interval = setInterval(async () => {
try {
const statusResponse = await fetch(`/api/update_task_status/${data.task_id}`);
const statusData = await statusResponse.json();
// 更新进度
progressFill.style.width = `${statusData.progress}%`;
progressFill.textContent = `${statusData.progress}%`;
if (statusData.status === 'running') {
progressText.textContent = `正在分析: ${statusData.completed}/${statusData.total} 个产品`;
} else if (statusData.status === 'completed') {
progressText.textContent = '🎉 所有缺失分数分析完成!';
clearInterval(interval);
analyzeBtn.disabled = false;
analyzeBtn.textContent = '📊 分析缺失分数';
// 如果当前正在查看product_analysis表自动刷新
if (currentTable === 'product_analysis') {
loadData();
}
} else if (statusData.status === 'failed') {
progressText.textContent = `❌ 分析失败: ${statusData.error}`;
clearInterval(interval);
analyzeBtn.disabled = false;
analyzeBtn.textContent = '📊 分析缺失分数';
}
} catch (error) {
console.error('查询任务状态失败:', error);
progressText.textContent = '查询任务状态失败';
}
}, 2000);
}
} catch (error) {
console.error('启动分析任务失败:', error);
progressText.textContent = `启动分析失败: ${error.message}`;
analyzeBtn.disabled = false;
analyzeBtn.textContent = '📊 分析缺失分数';
}
}
document.getElementById('searchField').addEventListener('change', loadData);
document.getElementById('searchValue').addEventListener('input', debounce(loadData, 500));
// 防抖函数
function debounce(func, wait) {
let timeout;
return function executedFunction(...args) {
const later = () => {
clearTimeout(timeout);
func(...args);
};
clearTimeout(timeout);
timeout = setTimeout(later, wait);
};
}
// 加载表列表
async function loadTables() {
try {
const response = await fetch('/api/tables');
const data = await response.json();
const select = document.getElementById('tableSelect');
select.innerHTML = '<option value="">选择数据表...</option>';
data.tables.forEach(table => {
const option = document.createElement('option');
option.value = table;
option.textContent = table;
select.appendChild(option);
});
} catch (error) {
console.error('加载表列表失败:', error);
showError('加载表列表失败: ' + error.message);
}
}
// 加载表结构
async function loadTableStructure() {
if (!currentTable) return;
try {
const response = await fetch(`/api/table/${currentTable}/structure`);
const data = await response.json();
const searchField = document.getElementById('searchField');
searchField.innerHTML = '<option value="">所有文本字段</option>';
data.structure.forEach(field => {
const option = document.createElement('option');
option.value = field.name;
option.textContent = field.name;
searchField.appendChild(option);
});
searchField.disabled = false;
document.getElementById('searchValue').disabled = false;
} catch (error) {
console.error('加载表结构失败:', error);
}
}
// 加载数据
async function loadData() {
if (!currentTable) return;
const container = document.getElementById('dataContainer');
container.innerHTML = '<div class="loading">📊 数据加载中...</div>';
const searchFieldSelect = document.getElementById('searchField');
const searchValue = document.getElementById('searchValue').value;
try {
let url = `/api/table/${currentTable}/data?page=${currentPage}&per_page=${perPage}`;
if (searchValue) {
// 获取所有选中的字段
const selectedFields = Array.from(searchFieldSelect.selectedOptions)
.map(option => option.value)
.filter(value => value !== '');
if (selectedFields.length > 0) {
// 如果选择了特定字段,传递所有选中的字段
selectedFields.forEach(field => {
url += `&search_field=${encodeURIComponent(field)}`;
});
} else {
// 否则使用"all"表示所有文本字段
url += '&search_field=all';
}
url += `&search_value=${encodeURIComponent(searchValue)}`;
}
const response = await fetch(url);
currentData = await response.json();
displayData(currentData);
updatePagination();
} catch (error) {
console.error('加载数据失败:', error);
showError('加载数据失败: ' + error.message);
}
}
// 显示数据
function displayData(data) {
const container = document.getElementById('dataContainer');
if (!data.rows || data.rows.length === 0) {
container.innerHTML = '<div class="no-data">📭 没有找到数据</div>';
return;
}
let html = '<div class="table-wrapper"><table><thead><tr>';
// 表头
data.columns.forEach(col => {
html += `<th>${col}</th>`;
});
html += '</tr></thead><tbody>';
// 数据行
data.rows.forEach(row => {
html += '<tr>';
row.forEach((cell, index) => {
const colName = data.columns[index];
if (cell.type === 'multiline') {
html += `<td><div class="multiline-cell">${escapeHtml(cell.value)}</div></td>`;
} else if (cell.type === 'empty') {
html += '<td><div class="empty-cell">空</div></td>';
} else if (colName === 'product_link' && cell.value) {
// 渲染为链接
html += `<td><div class="normal-cell"><a href="${escapeHtml(cell.value)}" target="_blank" rel="noopener noreferrer">${escapeHtml(cell.value)}</a></div></td>`;
} else {
html += `<td><div class="normal-cell">${escapeHtml(cell.value)}</div></td>`;
}
});
html += '</tr>';
});
html += '</tbody></table></div>';
container.innerHTML = html;
// 更新统计信息
document.getElementById('tableName').textContent = `📋 ${currentTable}`;
document.getElementById('recordCount').textContent = `记录数: ${data.total_count}`;
document.getElementById('pageInfo').textContent = `第 ${currentPage} 页,共 ${data.total_pages} 页`;
}
// 更新分页
function updatePagination() {
if (!currentData) return;
totalPages = currentData.total_pages;
const pagination = document.getElementById('pagination');
const prevBtn = document.getElementById('prevBtn');
const nextBtn = document.getElementById('nextBtn');
const pageInfo = document.getElementById('pageInfoDetail');
if (totalPages <= 1) {
pagination.style.display = 'none';
return;
}
pagination.style.display = 'flex';
prevBtn.disabled = currentPage <= 1;
nextBtn.disabled = currentPage >= totalPages;
pageInfo.textContent = `${currentPage} / ${totalPages}`;
}
// 翻页
function changePage(direction) {
if (direction === 'prev' && currentPage > 1) {
currentPage--;
loadData();
} else if (direction === 'next' && currentPage < totalPages) {
currentPage++;
loadData();
}
}
// HTML转义
function escapeHtml(text) {
const div = document.createElement('div');
div.textContent = text;
return div.innerHTML;
}
// 显示错误
function showError(message) {
const container = document.getElementById('dataContainer');
container.innerHTML = `<div class="error">❌ ${message}</div>`;
}
</script>
</body>
</html>'''
with open(os.path.join(template_dir, 'index.html'), 'w', encoding='utf-8') as f:
f.write(html_content)
logger.info("HTML模板创建完成")
if __name__ == '__main__':
logger.info("启动Web SQLite查看器")
# 创建HTML模板
create_html_template()
# 检查数据库文件
if not os.path.exists(DB_PATH):
logger.warning(f"数据库文件不存在: {DB_PATH}")
logger.info("将创建一个空的数据库用于演示")
# 创建示例数据库
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 创建示例表
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
features TEXT,
price REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建product_analysis表
cursor.execute('''
CREATE TABLE IF NOT EXISTS product_analysis (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER,
analysis TEXT,
difficulty_score REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (product_id) REFERENCES products(id)
)
''')
# 插入示例数据
sample_data = [
('产品A', '这是一个非常优秀的产品\n具有多种实用功能\n用户反馈很好', '高性能\n易用性\n稳定性', 99.99),
('产品B', '创新设计\n简洁界面\n强大功能', '创新\n美观\n实用', 149.99),
('产品C', '专业级解决方案\n适用于企业环境\n支持大规模部署', '企业级\n可扩展\n安全', 299.99)
]
cursor.executemany('INSERT INTO products (name, description, features, price) VALUES (?, ?, ?, ?)', sample_data)
# 插入一些分析数据
cursor.execute('INSERT INTO product_analysis (product_id, analysis, difficulty_score) VALUES (1, "产品分析示例", 75)')
cursor.execute('INSERT INTO product_analysis (product_id, analysis) VALUES (2, "产品分析示例,无分数")')
conn.commit()
conn.close()
logger.info("示例数据库创建完成")
logger.info(f"Web服务器启动访问地址: http://localhost:5000")
logger.info("按 Ctrl+C 停止服务器")
app.run(debug=True, host='0.0.0.0', port=5000)