#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CSV生成和Django ORM导入脚本 """ import csv import os from pathlib import Path from datetime import date from faker import Faker from loguru import logger # 初始化faker fake = Faker('zh_CN') # 项目根目录 BASE_DIR = Path(__file__).resolve().parent # 设置Django环境 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings') import django django.setup() # 导入Django模型 from core.models import ReadingType, InsightRecord, ReadingRecord # 生成ReadingType CSV @logger.catch def generate_reading_type_csv(): """生成阅读记录类型CSV文件""" csv_path = BASE_DIR / 'reading_type.csv' # 定义字段名,与数据库表字段一一对应 fields = ['name'] # 生成模拟数据 data = [ ['书籍'], ['文章'], ['论文'], ['报告'], ['其他'] ] # 写入CSV文件 with open(csv_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(fields) writer.writerows(data) logger.info(f"生成ReadingType CSV文件成功: {csv_path}") return csv_path # 生成InsightRecord CSV @logger.catch def generate_insight_record_csv(): """生成感悟记录CSV文件""" csv_path = BASE_DIR / 'insight_record.csv' # 定义字段名,与数据库表字段一一对应 fields = ['date', 'content'] # 生成模拟数据 data = [] for _ in range(10): record_date = fake.date_between(start_date='-30d', end_date='today') content = fake.paragraph(nb_sentences=3, variable_nb_sentences=True) data.append([record_date, content]) # 写入CSV文件 with open(csv_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(fields) writer.writerows(data) logger.info(f"生成InsightRecord CSV文件成功: {csv_path}") return csv_path # 生成ReadingRecord CSV @logger.catch def generate_reading_record_csv(): """生成阅读记录CSV文件""" csv_path = BASE_DIR / 'reading_record.csv' # 定义字段名,与数据库表字段一一对应 fields = ['date', 'type_id', 'title', 'source', 'progress', 'note'] # 生成模拟数据 data = [] for _ in range(15): record_date = fake.date_between(start_date='-30d', end_date='today') type_id = fake.random_int(min=1, max=5) # 对应ReadingType的id title = fake.sentence(nb_words=5, variable_nb_words=True) source = fake.company() progress = fake.random_element(['已完成', '进行中', '未开始']) note = fake.paragraph(nb_sentences=2, variable_nb_sentences=True) data.append([record_date, type_id, title, source, progress, note]) # 写入CSV文件 with open(csv_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(fields) writer.writerows(data) logger.info(f"生成ReadingRecord CSV文件成功: {csv_path}") return csv_path # 导入ReadingType CSV到Django模型 @logger.catch def import_reading_type_csv(csv_path): """将阅读记录类型CSV导入到Django模型""" try: logger.info(f"开始导入 {csv_path} 到ReadingType模型") # 清空现有数据 ReadingType.objects.all().delete() logger.info("已清空现有ReadingType数据") # 读取CSV文件并导入数据 with open(csv_path, 'r', encoding='utf-8') as f: # 跳过表头 next(f) # 读取CSV数据并创建模型实例 reader = csv.reader(f) reading_types = [] for row in reader: name = row[0] reading_types.append(ReadingType(name=name)) # 批量创建 ReadingType.objects.bulk_create(reading_types) logger.info(f"导入成功,共插入 {len(reading_types)} 条记录") except Exception as e: logger.error(f"导入失败: {e}") # 导入InsightRecord CSV到Django模型 @logger.catch def import_insight_record_csv(csv_path): """将感悟记录CSV导入到Django模型""" try: logger.info(f"开始导入 {csv_path} 到InsightRecord模型") # 读取CSV文件并导入数据 with open(csv_path, 'r', encoding='utf-8') as f: # 跳过表头 next(f) # 读取CSV数据并创建模型实例 reader = csv.reader(f) insight_records = [] for row in reader: record_date = row[0] content = row[1] insight_records.append(InsightRecord(date=record_date, content=content)) # 批量创建 InsightRecord.objects.bulk_create(insight_records) logger.info(f"导入成功,共插入 {len(insight_records)} 条记录") except Exception as e: logger.error(f"导入失败: {e}") # 导入ReadingRecord CSV到Django模型 @logger.catch def import_reading_record_csv(csv_path): """将阅读记录CSV导入到Django模型""" try: logger.info(f"开始导入 {csv_path} 到ReadingRecord模型") # 读取CSV文件并导入数据 with open(csv_path, 'r', encoding='utf-8') as f: # 跳过表头 next(f) # 读取CSV数据并创建模型实例 reader = csv.reader(f) reading_records = [] for row in reader: record_date = row[0] type_id = int(row[1]) title = row[2] source = row[3] progress = row[4] note = row[5] # 确保type_id存在 if ReadingType.objects.filter(id=type_id).exists(): reading_records.append(ReadingRecord( date=record_date, type_id=type_id, title=title, source=source, progress=progress, note=note )) else: logger.warning(f"跳过记录: 类型ID {type_id} 不存在") # 批量创建 ReadingRecord.objects.bulk_create(reading_records) logger.info(f"导入成功,共插入 {len(reading_records)} 条记录") except Exception as e: logger.error(f"导入失败: {e}") # 主函数 @logger.catch def main(): """主函数""" logger.info("开始执行CSV生成和导入脚本") # 生成CSV文件 reading_type_csv = generate_reading_type_csv() insight_record_csv = generate_insight_record_csv() reading_record_csv = generate_reading_record_csv() # 导入CSV到Django模型 import_reading_type_csv(reading_type_csv) import_insight_record_csv(insight_record_csv) import_reading_record_csv(reading_record_csv) logger.info("脚本执行完成") if __name__ == "__main__": main()