it编程 > 前端脚本 > Python

Python项目重构SQLite数据库的实战记录

0人参与 2026-03-19 Python

背景:一团乱麻的数据层

上个月,我接手维护一个公司内部用的数据采集与分析工具。这个工具用python写成,核心功能是把从不同api抓取到的设备状态数据存起来,供一个简单的web面板查询。数据量不大,每天也就几千条记录,所以当初的开发者选用了sqlite,这本身没问题。问题出在实现上。

我刚拿到代码时,差点没背过气去。整个项目就一个database.py文件,里面密密麻麻全是这样的代码:

def insert_device_data(device_id, status, timestamp):
    conn = sqlite3.connect('data.db')
    cursor = conn.cursor()
    # 注意看这里,字符串直接拼接!
    sql = f"insert into device_log (device_id, status, log_time) values ('{device_id}', '{status}', '{timestamp}')"
    cursor.execute(sql)
    conn.commit()
    conn.close()

这简直是“教科书式”的错误示范:手动管理连接、字符串拼接sql(sql注入的邀请函)、没有错误处理、表结构变更靠人肉在db browser里点点点。更头疼的是,由于没有数据模型定义,我根本不知道device_log表到底有哪些字段,类型是什么。项目运行半年,已经出现了几次因为数据格式问题导致的插入失败,以及一次轻微的数据混乱。

我的任务很明确:在不影响现有数据的前提下,重构这个数据层,让它变得可维护、可扩展、更安全。核心诉求就三点:1. 用orm替代裸sql;2. 引入数据库迁移工具;3. 保持向后兼容,平滑过渡。

问题分析:orm选型与迁移策略

我的第一反应是上sqlalchemy,这是python生态里最强大的orm,没有之一。但紧接着就面临两个具体问题:

我最初的思路是分两步走:先用sqlite3命令行或工具把现有表结构导出来,手动写成sqlalchemy模型。然后,把这个初始模型当作基准,让alembic生成一个“初始迁移”脚本,但这个脚本应该是空的(因为表已经存在)。我查了alembic文档,发现它确实支持这种场景,通过--autogenerate检测模型与数据库的差异,如果模型定义与当前数据库状态一致,生成的迁移脚本就是空的。然后,我未来的所有变更都可以从这个一致的状态开始,通过alembic来管理。

排查现有数据库时,我又发现一个坑:原表里有些字段是text类型,但存储的其实是json字符串。在旧代码里,每次查询出来都要用json.loads()解析。这应该在模型层就处理好。sqlalchemy提供了json类型(配合sqlite3json1扩展)或者用自定义类型,这给了我优化数据结构的机会。

核心实现第一步:定义数据模型

我决定不完全依赖automap,而是结合数据库现状和业务逻辑,手动定义模型。这样虽然初期工作量稍大,但模型更清晰、更可控。

首先,我使用sqlite3命令行查看了表结构:

.schema device_log

输出大概是:

create table device_log (
    id integer primary key,
    device_id text,
    status text,
    log_time text,
    raw_data text
);

我发现log_time存的是文本,raw_data里是json字符串。我新建了一个models.py文件来定义模型。

这里有个坑:sqlite默认不强制数据类型(type affinity),你甚至可以把字符串存到声明为integer的列里。但sqlalchemy的模型定义是强类型的,为了兼容,我决定在定义模型时,严格按照当前实际存储的数据类型来定义,避免初始化时就出错。对于log_time,我暂时还是用string,后续再考虑转为datetime。对于raw_data,我决定使用sqlalchemy的json类型,这需要sqlite支持json1扩展(现代python版本基本都支持)。

# models.py
from sqlalchemy import column, integer, string, json, datetime
from sqlalchemy.ext.declarative import declarative_base
import json

base = declarative_base()

class devicelog(base):
    __tablename__ = 'device_log'

    id = column(integer, primary_key=true)
    device_id = column(string(64), nullable=false, index=true)  # 加了索引,因为常按设备查询
    status = column(string(32), nullable=false)
    log_time = column(string(32))  # 暂时保持string,兼容旧数据
    raw_data = column(json)  # 使用json类型,sqlalchemy会自动处理序列化/反序列化

    # 添加一个属性,方便访问解析后的raw_data
    @property
    def data(self):
        # 如果raw_data已经是dict(用了json类型后),直接返回,否则尝试解析
        if isinstance(self.raw_data, dict):
            return self.raw_data
        try:
            return json.loads(self.raw_data) if self.raw_data else {}
        except json.jsondecodeerror:
            return {}

注意,我特意为device_id添加了index=true。因为在业务查询中,where device_id = ?是非常频繁的操作,原表没有索引,这在数据增长后会是性能瓶颈。这个索引的添加,我会通过alembic迁移来完成,而不是手动去数据库创建。

核心实现第二步:初始化alembic与空迁移

接下来是解决“先有数据库,后有模型”的迁移初始化问题。我按照alembic官方文档操作:

关键步骤来了:生成初始迁移脚本。我运行:

alembic revision --autogenerate -m "initial migration based on existing schema"

alembic会比较我的模型定义(base.metadata)和当前数据库(data.db)的差异。因为我刚才定义模型时,刻意保持了与现有表结构的一致(除了raw_data用了json类型,但sqlite底层存储还是text,alembic的检测可能忽略这种同质变化),所以生成的迁移脚本upgrade函数很可能是空的。这正是我想要的——一个代表当前状态的基准点。

我打开生成的迁移文件(在alembic/versions/下),确认upgrade()downgrade()函数确实为空,或者只包含一些无关紧要的操作。然后,我必须执行这个迁移,以在alembic的特殊表alembic_version中记录这个版本,这样alembic才知道当前数据库处于这个“初始”状态。

alembic upgrade head

执行后,用数据库工具查看,会发现多了一个alembic_version表,里面有一条记录,版本号对应刚才生成的迁移脚本。

核心实现第三步:通过迁移添加索引与优化

现在,alembic已经接管了数据库版本。我可以开始改进模型,并通过迁移来应用这些改进了。

首先,我实现之前计划的优化:为device_id添加索引,以及将log_timestring改为datetime(这需要数据转换)。

修改模型 (models.py):

class devicelog(base):
    __tablename__ = 'device_log'
    # ... 其他字段不变
    log_time = column(datetime)  # 改为datetime类型
    # device_id的索引已经在定义中,无需修改

注意,这里直接改了log_time的类型。alembic会发现这个变化。

生成并检查迁移脚本

alembic revision --autogenerate -m "add index on device_id and change log_time to datetime"

打开新生成的迁移文件,我看到了类似以下的内容:

def upgrade():
    # ### commands auto generated by alembic - please adjust! ###
    op.create_index(op.f('ix_device_log_device_id'), 'device_log', ['device_id'], unique=false)
    # 注意这里!alembic检测到类型变化,生成了alter_column操作
    op.alter_column('device_log', 'log_time',
                   existing_type=sa.text(),
                   type_=sa.datetime(),
                   existing_nullable=true)
    # ### end alembic commands ###

这里有个大坑! op.alter_column对于sqlite是有限支持的。sqlite本身不支持直接修改列类型(alter table ... alter column)。alembic会通过一个复杂的过程(创建新表、复制数据、删除旧表、重命名)来模拟这个操作。这对于有数据的表是危险的,尤其是类型转换(text -> datetime)可能失败。

手动编辑迁移脚本,安全处理数据转换。我不能完全依赖自动生成的脚本。我需要修改这个迁移,确保数据转换是安全可控的。

import sqlalchemy as sa
from alembic import op
import sqlite3
from datetime import datetime

# revision identifiers, used by alembic.
revision = 'xxxxxx'
down_revision = 'yyyyyy'

def upgrade():
    # 1. 先添加索引,这个操作sqlite原生支持,是安全的
    op.create_index(op.f('ix_device_log_device_id'), 'device_log', ['device_id'], unique=false)

    # 2. 处理log_time列类型转换。我们需要自定义逻辑。
    conn = op.get_bind()

    # 首先,读取所有现有的id和log_time文本
    rows = conn.execute(sa.text("select id, log_time from device_log where log_time is not null")).fetchall()

    # 创建一个临时字典,存储解析后的时间。如果解析失败,则置为none
    updates = []
    for row_id, time_str in rows:
        dt = none
        # 尝试解析旧数据中可能的时间格式
        for fmt in ('%y-%m-%d %h:%m:%s', '%y/%m/%d %h:%m:%s', '%y-%m-%dt%h:%m:%s'):
            try:
                dt = datetime.strptime(time_str, fmt)
                break
            except valueerror:
                continue
        updates.append((row_id, dt))

    # sqlite的alter column不支持类型变更,alembic会模拟。我们让它执行。
    # 但为了数据正确,我们在alembic操作后,手动更新数据。
    # 注意:alembic的模拟操作会创建新表、复制数据,但复制时log_time列的数据还是文本。
    # 所以我们需要在alembic的alter_column操作之后,再执行我们的更新。

    # 实际上,更安全的做法是分步迁移,这里为了示例,我们采用一个策略:
    # 先让alembic执行它的模拟alter操作(结构变更),然后我们手动更新数据。
    # 但alembic的op.alter_column在sqlite环境下是一个大事务,我们不能在中间插入。
    # 因此,我选择另一种方法:禁用alembic的自动alter,完全手动处理。

    # 注释掉自动生成的alter_column行
    # op.alter_column('device_log', 'log_time',
    #                existing_type=sa.text(),
    #                type_=sa.datetime(),
    #                existing_nullable=true)

    # 手动执行sqlite的列变更流程(简化版,假设没有复杂约束)
    # 步骤略复杂,在实际项目中,我可能会选择保持text类型,在应用层进行解析。
    # 鉴于篇幅和示例的清晰性,我决定在本文中采取一个折中方案:
    # 保留log_time为string,但添加一个datetime的属性方法,并确保新数据按iso格式存储。
    # 这是一个重要的实战决策:当数据迁移风险过高时,在应用层兼容是更稳妥的选择。

    # 所以,我回滚了模型中对log_time的修改,仍然使用string。
    # 并在devicelog类中添加一个方法:
    # @property
    # def log_datetime(self):
    #     # ... 解析self.log_time字符串为datetime对象,提供none安全
    pass

def downgrade():
    op.drop_index(op.f('ix_device_log_device_id'), table_name='device_log')
    # 类型改回操作也相应移除

经过这番思考,我意识到在现有数据质量不明的情况下,强行改变底层列类型风险太大。我调整了方案:保持数据库列类型不变,在orm层提供类型化的访问接口。这是很多遗留系统重构的实用技巧。我修改了模型,放弃直接修改column类型,而是通过hybrid_property或普通属性方法来提供datetime对象。

最终,我重新生成了一个只包含创建索引的迁移,并执行它。

alembic upgrade head

这样,性能索引加上了,数据结构的风险也避免了。

完整代码示例

以下是核心模块的最终版本代码,可以直接运行(需要先安装sqlalchemyalembic,并有一个名为data.db的sqlite数据库,其中包含device_log表)。

models.py:

from sqlalchemy import column, integer, string, json, datetime, create_engine, index
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import json
from datetime import datetime

base = declarative_base()

class devicelog(base):
    __tablename__ = 'device_log'

    id = column(integer, primary_key=true)
    device_id = column(string(64), nullable=false)
    status = column(string(32), nullable=false)
    log_time = column(string(32))  # 保持文本存储,兼容性第一
    raw_data = column(json, default=dict)  # 使用json类型,默认空字典

    # 定义索引,注意这里只是元数据,实际创建需要alembic迁移
    __table_args__ = (index('ix_device_log_device_id', 'device_id'), )

    @property
    def data(self):
        """获取解析后的raw_data"""
        if isinstance(self.raw_data, dict):
            return self.raw_data
        try:
            return json.loads(self.raw_data) if self.raw_data else {}
        except json.jsondecodeerror:
            return {}

    @property
    def log_datetime(self):
        """将log_time字符串转换为datetime对象,安全版本"""
        if not self.log_time:
            return none
        # 尝试常见格式
        for fmt in ('%y-%m-%d %h:%m:%s', '%y/%m/%d %h:%m:%s', '%y-%m-%dt%h:%m:%s', '%y-%m-%d %h:%m:%s.%f'):
            try:
                return datetime.strptime(self.log_time, fmt)
            except valueerror:
                continue
        # 如果都无法解析,记录警告或返回none
        # import logging
        # logging.warning(f"无法解析时间字符串: {self.log_time}")
        return none

    @log_datetime.setter
    def log_datetime(self, dt: datetime):
        """设置datetime,统一存储为iso格式字符串"""
        if dt is none:
            self.log_time = none
        else:
            # 存储为iso格式,便于解析和排序
            self.log_time = dt.isoformat(sep=' ', timespec='seconds')  # 例如 '2023-10-27 14:30:00'

# 数据库连接和会话工厂
engine = create_engine('sqlite:///data.db', echo=false)  # echo=true可查看sql日志
sessionlocal = sessionmaker(bind=engine)

def get_db():
    """依赖注入使用的会话获取函数"""
    db = sessionlocal()
    try:
        yield db
    finally:
        db.close()

使用示例 (example_usage.py):

from models import devicelog, get_db
from datetime import datetime

# 插入新数据
with next(get_db()) as db:
    new_log = devicelog(
        device_id="device_001",
        status="online",
        log_datetime=datetime.now(),  # 使用setter
        raw_data={"temperature": 23.5, "humidity": 60}  # 直接传字典
    )
    db.add(new_log)
    db.commit()
    print(f"插入记录id: {new_log.id}")

# 查询数据
with next(get_db()) as db:
    logs = db.query(devicelog).filter(devicelog.device_id == "device_001").order_by(devicelog.log_time).all()
    for log in logs:
        print(f"id: {log.id}, status: {log.status}")
        print(f"  时间对象: {log.log_datetime}")  # 使用property获取datetime
        print(f"  原始数据: {log.data}")  # 使用property获取解析后的dict
        print(f"  存储的时间文本: {log.log_time}")

alembic迁移目录 (alembic/) 是运行alembic init alembic命令生成的,需要根据项目配置env.pyalembic.ini

踩坑记录

1.alembic空迁移执行失败:第一次执行alembic upgrade head时,报错target database is not up to date。这是因为我没有理解alembic_version表的作用。在已有数据库上初始化时,必须先“标记”当前状态。解决方案就是生成一个与当前状态一致的模型,创建一个空迁移,并执行它,让alembic记录这个基准版本。

2.sqlite的alter column陷阱:如正文所述,alembic为sqlite生成的alter_column操作是模拟的,涉及表重建。对于有重要数据且列类型转换复杂的场景,这是高风险操作。我踩的坑是盲目信任自动生成的脚本,差点导致数据丢失。

解决方法:仔细审查针对sqlite生成的迁移脚本,对于复杂的数据转换,要么手动编写安全的数据迁移逻辑,要么像我一样,选择保持数据库列类型不变,在应用层进行适配。

3.json字段的默认值:在模型中将raw_data = column(json)后,插入新记录时如果没提供raw_data,默认会是none。但在业务逻辑中,我希望它默认是空字典{}。直接设置default={}会导致所有实例共享同一个字典引用(可变默认值陷阱)。

解决方法:使用default=dict,或者使用default=lambda: {},这样每次创建新实例时都会生成一个新的空字典。

4.连接未关闭导致数据库锁:在旧代码和我的新代码早期版本中,如果异常发生,连接可能没有正确关闭。在web应用或多线程环境下,这容易导致sqlite3.operationalerror: database is locked

解决方法:使用上下文管理器(with语句)或类似fastapi的depends(get_db)模式来确保会话在任何情况下都能正确关闭。我在get_db函数中使用了try...finally来保证。

小结

这次重构让我深刻体会到,即使是简单的sqlite,在项目规模增长后,也需要用orm和迁移工具这样的“重型装备”来管理。核心收获是:用orm定义清晰的数据模型,用迁移工具记录每一次结构变更,两者结合是维护数据库健康度的最佳实践

下一步,我可以探索sqlalchemy更高级的特性,如异步io(asyncpg/aiosqlite)、复杂查询优化,并将这套模式应用到更复杂的postgresql项目中。

以上就是python项目重构sqlite数据库的实战记录的详细内容,更多关于python重构sqlite数据库的资料请关注代码网其它相关文章!

(0)

您想发表意见!!点此发布评论

推荐阅读

Python利用OpenCV实现文档图像自动矫正的方法

03-19

Python编程之PDF转图片的三种实现方法详解

03-19

Python实现桌面端应用消息提醒功能

03-19

python在windows开启webserver的几种方法

03-19

Python + Skills 架构实现从理论到实践

03-19

Python基础课程之猜数字游戏完整代码示例

03-19

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论