#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 现代化Web SQLite数据库查看器 基于Flask框架,提供自然的内容显示和筛选功能 """ from flask import Flask, render_template, jsonify, request, send_from_directory import sqlite3 import os import json from datetime import datetime from loguru import logger app = Flask(__name__) # 数据库路径 DB_PATH = os.path.join(os.path.dirname(__file__), 'product.db') class SQLiteWebViewer: def __init__(self, db_path): self.db_path = db_path logger.info(f"初始化Web SQLite查看器,数据库路径: {db_path}") def get_tables(self): """获取所有表名""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'") tables = [row[0] for row in cursor.fetchall()] conn.close() logger.info(f"获取到 {len(tables)} 个表") return tables except Exception as e: logger.error(f"获取表列表失败: {e}") return [] def get_table_structure(self, table_name): """获取表结构""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(f"PRAGMA table_info({table_name})") columns = cursor.fetchall() conn.close() structure = [] for col in columns: structure.append({ 'cid': col[0], 'name': col[1], 'type': col[2], 'notnull': col[3], 'default': col[4], 'pk': col[5] }) logger.info(f"获取表 {table_name} 结构,共 {len(structure)} 个字段") return structure except Exception as e: logger.error(f"获取表结构失败: {e}") return [] def get_table_data(self, table_name, page=1, per_page=50, search_field=None, search_value=None): """获取表数据,支持分页和搜索""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 获取总记录数 if search_field and search_value: count_query = f"SELECT COUNT(*) FROM {table_name} WHERE {search_field} LIKE ?" cursor.execute(count_query, (f'%{search_value}%',)) else: count_query = f"SELECT COUNT(*) FROM {table_name}" cursor.execute(count_query) total_count = cursor.fetchone()[0] # 获取分页数据 offset = (page - 1) * per_page if search_field and search_value: data_query = f"SELECT * FROM {table_name} WHERE {search_field} LIKE ? LIMIT ? OFFSET ?" cursor.execute(data_query, (f'%{search_value}%', per_page, offset)) else: data_query = f"SELECT * FROM {table_name} LIMIT ? OFFSET ?" cursor.execute(data_query, (per_page, offset)) rows = cursor.fetchall() # 获取列名 cursor.execute(f"PRAGMA table_info({table_name})") columns = [col[1] for col in cursor.fetchall()] conn.close() # 处理数据,检测多行文本 processed_rows = [] for row in rows: processed_row = [] for cell in row: if cell is None: processed_row.append({'value': '', 'type': 'empty'}) elif isinstance(cell, str) and ('\n' in cell or len(cell) > 100): # 多行文本或长文本 lines = cell.count('\n') + 1 processed_row.append({ 'value': cell, 'type': 'multiline', 'lines': lines, 'length': len(cell) }) else: processed_row.append({'value': str(cell), 'type': 'normal'}) processed_rows.append(processed_row) logger.info(f"获取表 {table_name} 数据,第 {page} 页,共 {len(processed_rows)} 条记录") return { 'columns': columns, 'rows': processed_rows, 'total_count': total_count, 'page': page, 'per_page': per_page, 'total_pages': (total_count + per_page - 1) // per_page } except Exception as e: logger.error(f"获取表数据失败: {e}") return {'columns': [], 'rows': [], 'total_count': 0, 'page': 1, 'per_page': 50, 'total_pages': 0} # 初始化查看器 viewer = SQLiteWebViewer(DB_PATH) @app.route('/') def index(): """主页""" logger.info("访问主页") return render_template('index.html') @app.route('/api/tables') def get_tables(): """获取所有表""" tables = viewer.get_tables() return jsonify({'tables': tables}) @app.route('/api/table//structure') def get_table_structure(table_name): """获取表结构""" structure = viewer.get_table_structure(table_name) return jsonify({'structure': structure}) @app.route('/api/table//data') def get_table_data(table_name): """获取表数据""" page = int(request.args.get('page', 1)) per_page = int(request.args.get('per_page', 50)) search_field = request.args.get('search_field') search_value = request.args.get('search_value') data = viewer.get_table_data(table_name, page, per_page, search_field, search_value) return jsonify(data) @app.route('/static/') def static_files(filename): """静态文件""" return send_from_directory('static', filename) def create_html_template(): """创建HTML模板""" template_dir = 'templates' if not os.path.exists(template_dir): os.makedirs(template_dir) html_content = ''' SQLite数据库查看器

🗄️ SQLite数据库查看器

请选择数据表

记录数: 0 第 0 页,共 0 页
请选择数据表以查看内容
''' with open(os.path.join(template_dir, 'index.html'), 'w', encoding='utf-8') as f: f.write(html_content) logger.info("HTML模板创建完成") if __name__ == '__main__': logger.info("启动Web SQLite查看器") # 创建HTML模板 create_html_template() # 检查数据库文件 if not os.path.exists(DB_PATH): logger.warning(f"数据库文件不存在: {DB_PATH}") logger.info("将创建一个空的数据库用于演示") # 创建示例数据库 conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # 创建示例表 cursor.execute(''' CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, description TEXT, features TEXT, price REAL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 插入示例数据 sample_data = [ ('产品A', '这是一个非常优秀的产品\n具有多种实用功能\n用户反馈很好', '高性能\n易用性\n稳定性', 99.99), ('产品B', '创新设计\n简洁界面\n强大功能', '创新\n美观\n实用', 149.99), ('产品C', '专业级解决方案\n适用于企业环境\n支持大规模部署', '企业级\n可扩展\n安全', 299.99) ] cursor.executemany('INSERT INTO products (name, description, features, price) VALUES (?, ?, ?, ?)', sample_data) conn.commit() conn.close() logger.info("示例数据库创建完成") logger.info(f"Web服务器启动,访问地址: http://localhost:5000") logger.info("按 Ctrl+C 停止服务器") app.run(debug=True, host='0.0.0.0', port=5000)