# -*- coding: utf-8 -*- """ Flask后端API - 联系人管理 """ from flask import Flask, jsonify, request, send_from_directory import sqlite3 import os app = Flask(__name__, static_folder='static', static_url_path='/static') # 数据库文件 DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'contacts.db') # 前端页面路由 @app.route('/') def index(): return send_from_directory('static', 'contacts_manager.html') def get_db(): """获取数据库连接""" conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def row_to_dict(row): """将sqlite Row转换为字典""" return dict(row) if row else None @app.route('/api/contacts', methods=['GET']) def get_contacts(): """获取所有联系人""" conn = get_db() cursor = conn.cursor() # 获取查询参数 category = request.args.get('category') search = request.args.get('search') page = int(request.args.get('page', 1)) page_size = int(request.args.get('page_size', 20)) # 先获取总数 count_sql = "SELECT COUNT(*) FROM contacts WHERE 1=1" count_params = [] if category: # 支持多标签筛选:查找包含该标签的记录 count_sql += " AND (category = ? OR category LIKE ? OR category LIKE ? OR category LIKE ?)" count_params.extend([ category, # 完全匹配: "同事" f"{category},%", # 开头: "同事,好友" f"%,{category}", # 结尾: "好友,同事" f"%,{category},%" # 中间: "好友,同事,亲戚" ]) if search: count_sql += " AND (name LIKE ? OR search_name LIKE ?)" count_params.extend([f"%{search}%", f"%{search}%"]) cursor.execute(count_sql, count_params) total = cursor.fetchone()[0] # 获取分页数据 sql = "SELECT * FROM contacts WHERE 1=1" params = [] if category: sql += " AND (category = ? OR category LIKE ? OR category LIKE ? OR category LIKE ?)" params.extend([ category, f"{category},%", f"%,{category}", f"%,{category},%" ]) if search: sql += " AND (name LIKE ? OR search_name LIKE ?)" params.extend([f"%{search}%", f"%{search}%"]) sql += " ORDER BY name LIMIT ? OFFSET ?" params.extend([page_size, (page - 1) * page_size]) cursor.execute(sql, params) rows = cursor.fetchall() conn.close() contacts = [] for row in rows: c = row_to_dict(row) c['selected'] = bool(c.get('selected', 0)) contacts.append(c) return jsonify({ 'contacts': contacts, 'total': total, 'page': page, 'page_size': page_size, 'total_pages': (total + page_size - 1) // page_size }) @app.route('/api/contacts/', methods=['GET']) def get_contact(contact_id): """获取单个联系人""" conn = get_db() cursor = conn.cursor() cursor.execute("SELECT * FROM contacts WHERE id = ?", (contact_id,)) row = cursor.fetchone() conn.close() if row: c = row_to_dict(row) c['selected'] = bool(c.get('selected', 0)) return jsonify(c) return jsonify({'error': 'Not found'}), 404 @app.route('/api/contacts', methods=['POST']) def create_contact(): """创建联系人""" data = request.json conn = get_db() cursor = conn.cursor() # 如果没有指定 custom_content,默认使用 category 的值 custom_content = data.get('custom_content', '') if not custom_content: custom_content = data.get('category', '') cursor.execute(''' INSERT INTO contacts (name, search_name, category, custom_content, blessing, selected) VALUES (?, ?, ?, ?, ?, ?) ''', ( data.get('name', ''), data.get('search_name', data.get('name', '')), data.get('category', ''), custom_content, data.get('blessing', '马年新春快乐!愿您在新的一年里,事业腾飞,马到成功!'), 1 if data.get('selected') else 0 )) contact_id = cursor.lastrowid conn.commit() conn.close() return jsonify({'id': contact_id, 'message': '创建成功'}) @app.route('/api/contacts/', methods=['PUT']) def update_contact(contact_id): """更新联系人""" data = request.json conn = get_db() cursor = conn.cursor() # 构建更新语句 updates = [] params = [] if 'name' in data: updates.append("name = ?") params.append(data['name']) if 'search_name' in data: updates.append("search_name = ?") params.append(data['search_name']) if 'category' in data: updates.append("category = ?") params.append(data['category']) if 'custom_content' in data: updates.append("custom_content = ?") params.append(data['custom_content']) if 'blessing' in data: updates.append("blessing = ?") params.append(data['blessing']) if 'selected' in data: updates.append("selected = ?") params.append(1 if data['selected'] else 0) if updates: updates.append("updated_at = CURRENT_TIMESTAMP") params.append(contact_id) sql = f"UPDATE contacts SET {', '.join(updates)} WHERE id = ?" cursor.execute(sql, params) conn.commit() conn.close() return jsonify({'message': '更新成功'}) @app.route('/api/contacts/', methods=['DELETE']) def delete_contact(contact_id): """删除联系人""" conn = get_db() cursor = conn.cursor() cursor.execute("DELETE FROM contacts WHERE id = ?", (contact_id,)) conn.commit() conn.close() return jsonify({'message': '删除成功'}) @app.route('/api/stats', methods=['GET']) def get_stats(): """获取统计数据""" conn = get_db() cursor = conn.cursor() # 总数 cursor.execute("SELECT COUNT(*) FROM contacts") total = cursor.fetchone()[0] # 已选择数 cursor.execute("SELECT COUNT(*) FROM contacts WHERE selected = 1") selected = cursor.fetchone()[0] # 分类统计 cursor.execute(""" SELECT category, COUNT(*) as count FROM contacts GROUP BY category ORDER BY count DESC """) categories = [{'category': row[0] or '未分类', 'count': row[1]} for row in cursor.fetchall()] conn.close() return jsonify({ 'total': total, 'selected': selected, 'categories': categories }) if __name__ == '__main__': print("=" * 50) print("联系人管理API服务") print("=" * 50) print("启动服务: http://localhost:5000") print("前端页面: http://localhost:5000/static/contacts_manager.html") print("=" * 50) app.run(debug=True, host='0.0.0.0', port=5000)