39人参与 • 2025-11-20 • MsSqlserver
在开发过程中,我们经常会遇到需要批量写入大量数据到 postgresql 数据库的场景。当使用传统的参数化插入语句时,可能会遇到如下错误:
pq: got 86575 parameters but postgresql only supports 65535 parameters
这是因为 postgresql 对单个查询的参数数量有限制(通常为 65535)。传统的解决方案是进行数据分片,分批写入数据库。但这种方法存在以下问题:
postgresql 的 copy 协议是专门为高效批量数据操作设计的二进制协议,具有以下优势:
copy 协议使用 postgresql 的前后端协议进行数据传输,其工作流程如下:
copy from stdin 命令二进制格式避免了文本解析的开销,直接使用网络字节序传输数据,大大提高了传输效率。
import (
"github.com/lib/pq"
"gorm.io/gorm"
)
// batchcreate 批量创建消息接收者记录 - 使用 copy 协议
func (r *receiverrepo) batchcreate(ctx context.context, db *gorm.db, data []*define.wecommsgreceiver) (rowsaffected int64, err error) {
db = r.withtrace(ctx, db)
db = db.table(r.tablename())
if len(data) == 0 {
return 0, nil
}
// 过滤掉 nil 的数据
validdata := make([]*define.wecommsgreceiver, 0, len(data))
for _, item := range data {
if item != nil {
validdata = append(validdata, item)
}
}
if len(validdata) == 0 {
return 0, nil
}
// 获取底层 sql.db
sqldb := db.db()
// 开始事务
tx, err := sqldb.begintx(ctx, nil)
if err != nil {
return 0, fmt.errorf("开始事务失败:%+v", err)
}
defer func() {
if err != nil {
tx.rollback()
}
}()
// 创建 copy writer
stmt, err := tx.prepare(pq.copyin(r.tablename(), "send_log_id", "user_id", "status", "created_at", "updated_at"))
if err != nil {
return 0, fmt.errorf("准备 copy 语句失败:%+v", err)
}
defer stmt.close()
// 批量写入数据
for _, item := range validdata {
_, err = stmt.exec(item.sendlogid, item.userid, item.status, item.createdat, item.updatedat)
if err != nil {
return 0, fmt.errorf("写入数据失败:%+v", err)
}
}
// 执行 copy
_, err := stmt.exec()
if err != nil {
return 0, fmt.errorf("执行 copy 失败:%+v", err)
}
// 提交事务
if err = tx.commit(); err != nil {
return 0, fmt.errorf("提交事务失败:%+v", err)
}
rowsaffected = int64(len(validdata))
return rowsaffected, nil
}
pq.copyin 准备 copy 语句,指定表名和列名exec,但此时数据还在客户端缓冲区stmt.exec() 真正将数据发送到服务器// 设置测试数据库
func setuptestdb() (*gorm.db, error) {
ctx := context.background()
postgres, err := infrastructure.dialpostgres(ctx, infrastructure.postgresconfig{
host: "host",
port: 5432,
username: "postgres",
password: "xxxxx",
database: "xxxxx",
})
if err != nil {
return nil, err
}
return postgres, nil
}
func setuplogger() factory.logfactory {
logger, _ := factory.newjsonfactory(factory.newlevel("info"), factory.newzapoption(factory.addcallerskip(0)))
return logger
}
func testreceiverrepo_batchcreate(t *testing.t) {
db, err := setuptestdb()
require.noerror(t, err)
defer db.close()
// 创建日志工厂
logger := setuplogger()
// 创建 repository 实例
repo := newreceiverrepository(db, logger)
// 准备测试数据 - 20000 条记录,使用负的 send_log_id 避免污染数据
testdata := make([]*define.wecommsgreceiver, 0, 20000)
now := time.now()
negativesendlogid := int64(-100000) // 使用负的 send_log_id
for i := 0; i < 20000; i++ {
testdata = append(testdata, &define.wecommsgreceiver{
sendlogid: negativesendlogid,
userid: "test_user_" + fmt.sprint(i),
status: 1,
createdat: now,
updatedat: now,
})
}
ctx := context.background()
// 执行批量插入
rowsaffected, err := repo.batchcreate(ctx, db, testdata)
// 验证结果
assert.noerror(t, err)
assert.equal(t, int64(20000), rowsaffected)
// 验证数据是否正确插入
var count int64
query := "select count(*) from wecom_msg_receiver where send_log_id < 0 and send_log_id >= ?"
err = db.raw(query, negativesendlogid).count(&count).error
assert.noerror(t, err)
assert.equal(t, int64(20000), count)
// 清理测试数据
deletequery := "delete from wecom_msg_receiver where send_log_id < 0 and send_log_id >= ?"
result := db.exec(deletequery, negativesendlogid)
assert.noerror(t, result.error)
assert.equal(t, int64(20000), result.rowsaffected)
// 验证清理是否成功
err = db.raw(query, negativesendlogid).count(&count).error
assert.noerror(t, err)
assert.equal(t, int64(0), count)
}
在实际测试中,copy 协议相比传统分批插入有显著性能提升:
| 方案 | 20000 条数据耗时 | 内存占用 | 网络请求次数 |
|---|---|---|---|
| 传统分批插入 | ~15 秒 | 高 | 多次 |
| copy 协议 | ~2 秒 | 低 | 1 次 |
通过使用 postgresql 的 copy 协议,我们成功解决了批量写入时的参数数量限制问题,同时大幅提升了性能。这种方法特别适合数据迁移、日志批量处理等需要高效写入大量数据的场景。
copy协议结合事务管理,既保证了数据一致性,又能提供了接近原生的写入性能,是postgresql批量数据操作的优选方案。
以上就是postgresql使用copy协议高效批量数据写入的实战指南的详细内容,更多关于postgresql copy批量数据写入的资料请关注代码网其它相关文章!
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论