#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 产品AI分析脚本 读取SQLite数据库中的产品信息,调用智谱AI API进行分析,并将结果存储到新表中 """ import sqlite3 import os import time from typing import List, Tuple, Optional from loguru import logger # 智谱AI API相关 import requests import json class ProductAIAnalyzer: """产品AI分析器""" def __init__(self, api_key: str, db_path: str = "products.db"): """ 初始化分析器 Args: api_key: 智谱AI API密钥 db_path: 数据库文件路径 """ self.api_key = api_key self.db_path = db_path self.api_url = "https://open.bigmodel.cn/api/paas/v4/chat/completions" # 检查数据库文件是否存在 if not os.path.exists(db_path): raise FileNotFoundError(f"数据库文件不存在: {db_path}") logger.info(f"初始化产品AI分析器,数据库: {db_path}") def connect_to_database(self) -> sqlite3.Connection: """连接到SQLite数据库""" try: conn = sqlite3.connect(self.db_path) logger.success(f"成功连接到数据库: {self.db_path}") return conn except Exception as e: logger.error(f"连接数据库失败: {e}") raise def get_product_data(self, conn: sqlite3.Connection) -> List[Tuple]: """ 从数据库获取产品数据 Args: conn: 数据库连接 Returns: 产品数据列表,每个元素为(id, name, introduction) """ try: cursor = conn.cursor() # 查询products表中的name和introduction字段 cursor.execute(""" SELECT id, name, introduction FROM products WHERE name IS NOT NULL AND introduction IS NOT NULL AND name != '' AND introduction != '' """) products = cursor.fetchall() logger.info(f"从数据库获取到 {len(products)} 个产品") # 显示前几个产品作为示例 for i, (id, name, intro) in enumerate(products[:3], 1): logger.info(f"示例产品{i}: ID={id}, 名称='{name}', 简介='{intro[:50]}...'") return products except Exception as e: logger.error(f"获取产品数据失败: {e}") raise def call_zhipu_ai_api(self, name: str, introduction: str) -> Optional[str]: """ 调用智谱AI API进行分析 Args: name: 产品名称 introduction: 产品简介 Returns: API响应内容,失败时返回None """ try: # 构建请求数据 messages = [ { "role": "system", "content": "你是一个有用的AI助手。" }, { "role": "user", "content": f"这个是【{name}】,简介内容是【{introduction}】。请把产品的简介翻译成中文,并返回假设一个人加上AI辅助能否开发这个产品,请详细回答。返回的内容是产品名称/产品简介/开发难度。返回的例子一:notion/这个是笔记产品等等/一个人开发难度较高" } ] data = { "model": "GLM-4.5-Flash", "messages": messages, "temperature": 0.6 } headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } logger.info(f"调用智谱AI API分析产品: {name}") response = requests.post( self.api_url, headers=headers, data=json.dumps(data, ensure_ascii=False), timeout=60 ) if response.status_code == 200: result = response.json() content = result["choices"][0]["message"]["content"] logger.success(f"API调用成功: {name}") return content else: logger.error(f"API调用失败: {response.status_code}, {response.text}") return None except Exception as e: logger.error(f"调用智谱AI API时出错: {e}") return None def parse_ai_response(self, response: str) -> Tuple[str, str, str]: """ 解析AI响应内容 Args: response: AI响应内容 Returns: (产品名称, 产品简介, 开发难度) """ try: # 使用/分割响应内容 parts = response.split('/') if len(parts) >= 3: product_name = parts[0].strip() product_intro = parts[1].strip() difficulty = parts[2].strip() logger.info(f"解析结果: 名称='{product_name}', 简介='{product_intro[:30]}...', 难度='{difficulty}'") return product_name, product_intro, difficulty else: logger.warning(f"响应格式不符合预期: {response}") # 如果格式不符合,返回原始内容 return "", response, "" except Exception as e: logger.error(f"解析AI响应失败: {e}") return "", response, "" def create_analysis_table(self, conn: sqlite3.Connection): """创建分析结果表""" try: cursor = conn.cursor() # 创建分析结果表 cursor.execute(""" CREATE TABLE IF NOT EXISTS product_analysis ( id INTEGER PRIMARY KEY AUTOINCREMENT, original_id INTEGER, original_name TEXT, product_name TEXT, product_intro TEXT, development_difficulty TEXT, ai_response TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (original_id) REFERENCES products (id) ) """) conn.commit() logger.success("创建分析结果表成功") except Exception as e: logger.error(f"创建分析结果表失败: {e}") raise def save_analysis_result(self, conn: sqlite3.Connection, original_id: int, original_name: str, product_name: str, product_intro: str, difficulty: str, ai_response: str): """保存分析结果到数据库""" try: cursor = conn.cursor() cursor.execute(""" INSERT INTO product_analysis (original_id, original_name, product_name, product_intro, development_difficulty, ai_response) VALUES (?, ?, ?, ?, ?, ?) """, (original_id, original_name, product_name, product_intro, difficulty, ai_response)) conn.commit() logger.success(f"保存分析结果成功: {product_name}") except Exception as e: logger.error(f"保存分析结果失败: {e}") raise def check_product_exists(self, conn: sqlite3.Connection, original_name: str) -> bool: """ 检查产品是否已存在于分析结果表中 Args: conn: 数据库连接 original_name: 原始产品名称 Returns: 如果产品已存在返回True,否则返回False """ try: cursor = conn.cursor() cursor.execute(""" SELECT COUNT(*) FROM product_analysis WHERE original_name = ? """, (original_name,)) count = cursor.fetchone()[0] exists = count > 0 if exists: logger.info(f"产品 '{original_name}' 已存在,跳过分析") return exists except Exception as e: logger.error(f"检查产品存在性失败: {e}") return False def analyze_products(self, max_products: int = None): """ 分析产品数据 Args: max_products: 最大分析产品数量,None表示分析所有产品 """ if max_products is None: logger.info("开始分析所有产品数据") else: logger.info(f"开始分析产品数据,最大数量: {max_products}") conn = None try: # 连接数据库 conn = self.connect_to_database() # 创建分析结果表 self.create_analysis_table(conn) # 获取产品数据 products = self.get_product_data(conn) if not products: logger.warning("没有找到可分析的产品数据") return # 限制分析数量 if max_products is not None: products_to_analyze = products[:max_products] else: products_to_analyze = products logger.info(f"准备分析 {len(products_to_analyze)} 个产品") # 逐个分析产品 success_count = 0 skip_count = 0 for i, (original_id, name, introduction) in enumerate(products_to_analyze, 1): logger.info(f"\n分析进度: {i}/{len(products_to_analyze)} - {name}") # 检查产品是否已存在 if self.check_product_exists(conn, name): skip_count += 1 logger.info(f"跳过已存在产品,当前进度: {i}/{len(products_to_analyze)}") continue # 显示API调用状态 logger.info(f"正在提交API请求... 进度: {i}/{len(products_to_analyze)}") # 调用AI API ai_response = self.call_zhipu_ai_api(name, introduction) if ai_response: # 显示数据处理状态 logger.info(f"API调用成功,正在处理数据...") # 解析响应 product_name, product_intro, difficulty = self.parse_ai_response(ai_response) # 保存结果 self.save_analysis_result(conn, original_id, name, product_name, product_intro, difficulty, ai_response) success_count += 1 # 显示完成状态 logger.success(f"产品 '{name}' 分析完成,进度: {i}/{len(products_to_analyze)}") else: logger.error(f"分析失败: {name}") # 处理完数据后延时2秒 logger.info("数据处理完成,等待2秒后继续...") time.sleep(2) logger.success(f"分析完成! 成功分析 {success_count} 个产品,跳过 {skip_count} 个已存在产品") except Exception as e: logger.error(f"分析过程中出错: {e}") finally: if conn: conn.close() logger.info("数据库连接已关闭") def main(): """主函数""" # 配置日志 logger.add("product_ai_analysis.log", rotation="10 MB", level="INFO") # 智谱AI API密钥(请替换为您的实际密钥) api_key = "fad3d9f9a45f4d939f0e7a7133fa07bf.X4bOO053GAIPKLE5" # 创建分析器 analyzer = ProductAIAnalyzer(api_key) # 开始分析(默认分析所有产品) analyzer.analyze_products(max_products=None) if __name__ == "__main__": main()