Files
diary-family/generate_import_csv.py

232 lines
7.1 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CSV生成和Django ORM导入脚本
"""
import csv
import os
from pathlib import Path
from datetime import date
from faker import Faker
from loguru import logger
# 初始化faker
fake = Faker('zh_CN')
# 项目根目录
BASE_DIR = Path(__file__).resolve().parent
# 设置Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
# 导入Django模型
from core.models import ReadingType, InsightRecord, ReadingRecord
# 生成ReadingType CSV
@logger.catch
def generate_reading_type_csv():
"""生成阅读记录类型CSV文件"""
csv_path = BASE_DIR / 'reading_type.csv'
# 定义字段名,与数据库表字段一一对应
fields = ['name']
# 生成模拟数据
data = [
['书籍'],
['文章'],
['论文'],
['报告'],
['其他']
]
# 写入CSV文件
with open(csv_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(fields)
writer.writerows(data)
logger.info(f"生成ReadingType CSV文件成功: {csv_path}")
return csv_path
# 生成InsightRecord CSV
@logger.catch
def generate_insight_record_csv():
"""生成感悟记录CSV文件"""
csv_path = BASE_DIR / 'insight_record.csv'
# 定义字段名,与数据库表字段一一对应
fields = ['date', 'content']
# 生成模拟数据
data = []
for _ in range(10):
record_date = fake.date_between(start_date='-30d', end_date='today')
content = fake.paragraph(nb_sentences=3, variable_nb_sentences=True)
data.append([record_date, content])
# 写入CSV文件
with open(csv_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(fields)
writer.writerows(data)
logger.info(f"生成InsightRecord CSV文件成功: {csv_path}")
return csv_path
# 生成ReadingRecord CSV
@logger.catch
def generate_reading_record_csv():
"""生成阅读记录CSV文件"""
csv_path = BASE_DIR / 'reading_record.csv'
# 定义字段名,与数据库表字段一一对应
fields = ['date', 'type_id', 'title', 'source', 'progress', 'note']
# 生成模拟数据
data = []
for _ in range(15):
record_date = fake.date_between(start_date='-30d', end_date='today')
type_id = fake.random_int(min=1, max=5) # 对应ReadingType的id
title = fake.sentence(nb_words=5, variable_nb_words=True)
source = fake.company()
progress = fake.random_element(['已完成', '进行中', '未开始'])
note = fake.paragraph(nb_sentences=2, variable_nb_sentences=True)
data.append([record_date, type_id, title, source, progress, note])
# 写入CSV文件
with open(csv_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(fields)
writer.writerows(data)
logger.info(f"生成ReadingRecord CSV文件成功: {csv_path}")
return csv_path
# 导入ReadingType CSV到Django模型
@logger.catch
def import_reading_type_csv(csv_path):
"""将阅读记录类型CSV导入到Django模型"""
try:
logger.info(f"开始导入 {csv_path} 到ReadingType模型")
# 清空现有数据
ReadingType.objects.all().delete()
logger.info("已清空现有ReadingType数据")
# 读取CSV文件并导入数据
with open(csv_path, 'r', encoding='utf-8') as f:
# 跳过表头
next(f)
# 读取CSV数据并创建模型实例
reader = csv.reader(f)
reading_types = []
for row in reader:
name = row[0]
reading_types.append(ReadingType(name=name))
# 批量创建
ReadingType.objects.bulk_create(reading_types)
logger.info(f"导入成功,共插入 {len(reading_types)} 条记录")
except Exception as e:
logger.error(f"导入失败: {e}")
# 导入InsightRecord CSV到Django模型
@logger.catch
def import_insight_record_csv(csv_path):
"""将感悟记录CSV导入到Django模型"""
try:
logger.info(f"开始导入 {csv_path} 到InsightRecord模型")
# 读取CSV文件并导入数据
with open(csv_path, 'r', encoding='utf-8') as f:
# 跳过表头
next(f)
# 读取CSV数据并创建模型实例
reader = csv.reader(f)
insight_records = []
for row in reader:
record_date = row[0]
content = row[1]
insight_records.append(InsightRecord(date=record_date, content=content))
# 批量创建
InsightRecord.objects.bulk_create(insight_records)
logger.info(f"导入成功,共插入 {len(insight_records)} 条记录")
except Exception as e:
logger.error(f"导入失败: {e}")
# 导入ReadingRecord CSV到Django模型
@logger.catch
def import_reading_record_csv(csv_path):
"""将阅读记录CSV导入到Django模型"""
try:
logger.info(f"开始导入 {csv_path} 到ReadingRecord模型")
# 读取CSV文件并导入数据
with open(csv_path, 'r', encoding='utf-8') as f:
# 跳过表头
next(f)
# 读取CSV数据并创建模型实例
reader = csv.reader(f)
reading_records = []
for row in reader:
record_date = row[0]
type_id = int(row[1])
title = row[2]
source = row[3]
progress = row[4]
note = row[5]
# 确保type_id存在
if ReadingType.objects.filter(id=type_id).exists():
reading_records.append(ReadingRecord(
date=record_date,
type_id=type_id,
title=title,
source=source,
progress=progress,
note=note
))
else:
logger.warning(f"跳过记录: 类型ID {type_id} 不存在")
# 批量创建
ReadingRecord.objects.bulk_create(reading_records)
logger.info(f"导入成功,共插入 {len(reading_records)} 条记录")
except Exception as e:
logger.error(f"导入失败: {e}")
# 主函数
@logger.catch
def main():
"""主函数"""
logger.info("开始执行CSV生成和导入脚本")
# 生成CSV文件
reading_type_csv = generate_reading_type_csv()
insight_record_csv = generate_insight_record_csv()
reading_record_csv = generate_reading_record_csv()
# 导入CSV到Django模型
import_reading_type_csv(reading_type_csv)
import_insight_record_csv(insight_record_csv)
import_reading_record_csv(reading_record_csv)
logger.info("脚本执行完成")
if __name__ == "__main__":
main()