Files
weixin-holiday-message/app.py

250 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
Flask后端API - 联系人管理
"""
from flask import Flask, jsonify, request, send_from_directory
import sqlite3
import os
app = Flask(__name__, static_folder='static', static_url_path='/static')
# 数据库文件
DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'contacts.db')
# 前端页面路由
@app.route('/')
def index():
return send_from_directory('static', 'contacts_manager.html')
def get_db():
"""获取数据库连接"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def row_to_dict(row):
"""将sqlite Row转换为字典"""
return dict(row) if row else None
@app.route('/api/contacts', methods=['GET'])
def get_contacts():
"""获取所有联系人"""
conn = get_db()
cursor = conn.cursor()
# 获取查询参数
category = request.args.get('category')
search = request.args.get('search')
page = int(request.args.get('page', 1))
page_size = int(request.args.get('page_size', 20))
# 先获取总数
count_sql = "SELECT COUNT(*) FROM contacts WHERE 1=1"
count_params = []
if category:
# 支持多标签筛选:查找包含该标签的记录
count_sql += " AND (category = ? OR category LIKE ? OR category LIKE ? OR category LIKE ?)"
count_params.extend([
category, # 完全匹配: "同事"
f"{category},%", # 开头: "同事,好友"
f"%,{category}", # 结尾: "好友,同事"
f"%,{category},%" # 中间: "好友,同事,亲戚"
])
if search:
count_sql += " AND (name LIKE ? OR search_name LIKE ?)"
count_params.extend([f"%{search}%", f"%{search}%"])
cursor.execute(count_sql, count_params)
total = cursor.fetchone()[0]
# 获取分页数据
sql = "SELECT * FROM contacts WHERE 1=1"
params = []
if category:
sql += " AND (category = ? OR category LIKE ? OR category LIKE ? OR category LIKE ?)"
params.extend([
category,
f"{category},%",
f"%,{category}",
f"%,{category},%"
])
if search:
sql += " AND (name LIKE ? OR search_name LIKE ?)"
params.extend([f"%{search}%", f"%{search}%"])
sql += " ORDER BY name LIMIT ? OFFSET ?"
params.extend([page_size, (page - 1) * page_size])
cursor.execute(sql, params)
rows = cursor.fetchall()
conn.close()
contacts = []
for row in rows:
c = row_to_dict(row)
c['selected'] = bool(c.get('selected', 0))
contacts.append(c)
return jsonify({
'contacts': contacts,
'total': total,
'page': page,
'page_size': page_size,
'total_pages': (total + page_size - 1) // page_size
})
@app.route('/api/contacts/<int:contact_id>', methods=['GET'])
def get_contact(contact_id):
"""获取单个联系人"""
conn = get_db()
cursor = conn.cursor()
cursor.execute("SELECT * FROM contacts WHERE id = ?", (contact_id,))
row = cursor.fetchone()
conn.close()
if row:
c = row_to_dict(row)
c['selected'] = bool(c.get('selected', 0))
return jsonify(c)
return jsonify({'error': 'Not found'}), 404
@app.route('/api/contacts', methods=['POST'])
def create_contact():
"""创建联系人"""
data = request.json
conn = get_db()
cursor = conn.cursor()
# 如果没有指定 custom_content默认使用 category 的值
custom_content = data.get('custom_content', '')
if not custom_content:
custom_content = data.get('category', '')
cursor.execute('''
INSERT INTO contacts (name, search_name, category, custom_content, blessing, selected)
VALUES (?, ?, ?, ?, ?, ?)
''', (
data.get('name', ''),
data.get('search_name', data.get('name', '')),
data.get('category', ''),
custom_content,
data.get('blessing', '马年新春快乐!愿您在新的一年里,事业腾飞,马到成功!'),
1 if data.get('selected') else 0
))
contact_id = cursor.lastrowid
conn.commit()
conn.close()
return jsonify({'id': contact_id, 'message': '创建成功'})
@app.route('/api/contacts/<int:contact_id>', methods=['PUT'])
def update_contact(contact_id):
"""更新联系人"""
data = request.json
conn = get_db()
cursor = conn.cursor()
# 构建更新语句
updates = []
params = []
if 'name' in data:
updates.append("name = ?")
params.append(data['name'])
if 'search_name' in data:
updates.append("search_name = ?")
params.append(data['search_name'])
if 'category' in data:
updates.append("category = ?")
params.append(data['category'])
if 'custom_content' in data:
updates.append("custom_content = ?")
params.append(data['custom_content'])
if 'blessing' in data:
updates.append("blessing = ?")
params.append(data['blessing'])
if 'selected' in data:
updates.append("selected = ?")
params.append(1 if data['selected'] else 0)
if updates:
updates.append("updated_at = CURRENT_TIMESTAMP")
params.append(contact_id)
sql = f"UPDATE contacts SET {', '.join(updates)} WHERE id = ?"
cursor.execute(sql, params)
conn.commit()
conn.close()
return jsonify({'message': '更新成功'})
@app.route('/api/contacts/<int:contact_id>', methods=['DELETE'])
def delete_contact(contact_id):
"""删除联系人"""
conn = get_db()
cursor = conn.cursor()
cursor.execute("DELETE FROM contacts WHERE id = ?", (contact_id,))
conn.commit()
conn.close()
return jsonify({'message': '删除成功'})
@app.route('/api/stats', methods=['GET'])
def get_stats():
"""获取统计数据"""
conn = get_db()
cursor = conn.cursor()
# 总数
cursor.execute("SELECT COUNT(*) FROM contacts")
total = cursor.fetchone()[0]
# 已选择数
cursor.execute("SELECT COUNT(*) FROM contacts WHERE selected = 1")
selected = cursor.fetchone()[0]
# 分类统计
cursor.execute("""
SELECT category, COUNT(*) as count
FROM contacts
GROUP BY category
ORDER BY count DESC
""")
categories = [{'category': row[0] or '未分类', 'count': row[1]} for row in cursor.fetchall()]
conn.close()
return jsonify({
'total': total,
'selected': selected,
'categories': categories
})
if __name__ == '__main__':
print("=" * 50)
print("联系人管理API服务")
print("=" * 50)
print("启动服务: http://localhost:5000")
print("前端页面: http://localhost:5000/static/contacts_manager.html")
print("=" * 50)
app.run(debug=True, host='0.0.0.0', port=5000)