from typing import Dict, Any, List, Optional import pandas as pd from pathlib import Path from loguru import logger class SQLConnector: def __init__(self, config: Dict[str, Any] = None): self.config = config or {} self._connection = None def connect(self, conn_type: str = None, **kwargs) -> bool: conn_type = conn_type or self.config.get('type', 'sqlite') try: if conn_type == 'sqlite': import sqlite3 db_path = kwargs.get('db_path', self.config.get('db_path', ':memory:')) self._connection = sqlite3.connect(db_path) logger.info(f"已连接到 SQLite: {db_path}") elif conn_type in ['mysql', 'mariadb']: try: import pymysql self._connection = pymysql.connect( host=kwargs.get('host', self.config.get('host', 'localhost')), port=kwargs.get('port', self.config.get('port', 3306)), user=kwargs.get('user', self.config.get('user', 'root')), password=kwargs.get('password', self.config.get('password', '')), database=kwargs.get('database', self.config.get('database')), charset='utf8mb4' ) logger.info("已连接到 MySQL") except ImportError: logger.warning("pymysql 未安装,尝试使用 sqlite") import sqlite3 self._connection = sqlite3.connect(':memory:') elif conn_type == 'clickhouse': self._connection = {'type': 'clickhouse', 'config': {**self.config, **kwargs}} logger.info("ClickHouse 连接配置已保存") return True except Exception as e: logger.exception(f"SQL连接失败: {e}") return False def query(self, sql: str, params: tuple = None) -> Optional[pd.DataFrame]: if not self._connection: logger.error("未建立数据库连接") return None try: if isinstance(self._connection, dict) and self._connection.get('type') == 'clickhouse': logger.warning("ClickHouse 需要 clickhouse_driver 库,这里使用模拟") return pd.DataFrame({'column1': [1, 2, 3], 'column2': ['a', 'b', 'c']}) df = pd.read_sql_query(sql, self._connection, params=params or ()) logger.success(f"SQL查询成功,返回 {len(df)} 行") return df except Exception as e: logger.exception(f"SQL查询失败: {e}") return None def execute(self, sql: str, params: tuple = None) -> int: if not self._connection: return -1 try: cursor = self._connection.cursor() cursor.execute(sql, params or ()) self._connection.commit() return cursor.rowcount except Exception as e: logger.exception(f"SQL执行失败: {e}") return -1 def close(self): if self._connection and hasattr(self._connection, 'close'): self._connection.close() logger.info("数据库连接已关闭") class APIConnector: def __init__(self, base_url: str = None, headers: Dict[str, str] = None): self.base_url = base_url self.headers = headers or {} def get(self, endpoint: str, params: Dict[str, Any] = None, **kwargs) -> Optional[Dict]: import requests try: url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}" if self.base_url else endpoint response = requests.get(url, params=params, headers={**self.headers, **kwargs.pop('headers', {})}, timeout=30) response.raise_for_status() logger.info(f"API GET成功: {endpoint}") return response.json() except Exception as e: logger.exception(f"API GET失败: {e}") return None def post(self, endpoint: str, json: Dict = None, **kwargs) -> Optional[Dict]: import requests try: url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}" if self.base_url else endpoint response = requests.post(url, json=json, headers=self.headers, timeout=30) response.raise_for_status() return response.json() except Exception as e: logger.exception(f"API POST失败: {e}") return None class FileConnector: @staticmethod def read_csv(file_path: str, **kwargs) -> Optional[pd.DataFrame]: try: df = pd.read_csv(file_path, **kwargs) logger.info(f"读取CSV: {file_path}, {len(df)}行") return df except Exception as e: logger.exception(f"读取CSV失败: {e}") return None @staticmethod def read_excel(file_path: str, sheet_name: str = None, **kwargs) -> Optional[pd.DataFrame]: try: df = pd.read_excel(file_path, sheet_name=sheet_name or 0, **kwargs) logger.info(f"读取Excel: {file_path}, sheet={sheet_name}") return df except Exception as e: logger.exception(f"读取Excel失败: {e}") return None @staticmethod def read_json(file_path: str, **kwargs) -> Optional[Dict]: import json try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) logger.info(f"读取JSON: {file_path}") return data except Exception as e: logger.exception(f"读取JSON失败: {e}") return None