154人参与 • 2024-08-06 • PostgreSQL
本文描述了 deepflow server 用到的 mysql 数据库改成 postgresql 数据库的改造思路和实现细节。
在进入正题之前,我们需要了解 mysql 数据库在 deepflow 里的数据流向细节。
deepflow server 使用 mysql 数据库存储 agent 收集的 kubernetes apiserver 拉取的全量资源数据和 watch 的资源变更数据,以供后续 server 端进行观测数据的关联分析和资源查询展示。由于 kubernetes 的 watch 机制,资源数据变更频发,这就需要频繁增删改数据,这类操作像 clickhouse 这种非关系数据库的架构并不适合,只有关系型数据库才能有效支撑。deepflow 在入库观测数据时用到 smartencoding 机制,为所有观测数据自动注入资源、服务和业务标签,首先 agent 获取到字符串格式的标签并汇总到 server 上,接下来 server 会对所有的标签进行编码。对观测数据的 smartencoding 过程包含三个阶段:
deepflow 数据流向图
1.采集阶段
deepflow agent 从 k8s apiserver 的 list/watch 接口获取资源数据,同时 agent 从 ebpf 获取网络观测数据,agent 为每一条观测数据自动注入 vpc(integer)、ip、pid 标签。
2.存储阶段
deepflow agent 上报资源和观测数据给 deepflow server 后,server 根据 agent 标记的 vpc、ip、pid 标签,为观测数据自动注入少量的、int 编码的元标签(meta tag),包括 ip 和 pid 所对应的云资源属性、k8s 资源属性、进程属性,并存储到 clickhouse 数据库中。
3.查询阶段
server 自动计算所有自定义标签和元标签之间的关联关系,用户可直接通过 sql/promql 在所有观测数据上查询(过滤、分组)所有的标签。
以笔者搭建一套 deepflow 的 demo 环境中的数据举例:
mysql库中存储了 k8s 资源数据,包括:
类型 | 表名 | 变更类表名(ch 开头) |
---|---|---|
集群 | pod_cluster、kubernetes_cluster | ch_pod_cluster |
节点 | pod_node | ch_pod_node、ch_pod_node_port |
命名空间 | pod_namespace | ch_pod_ns、 |
容器服务 | pod_service 、pod_service_port | ch_pod_service_k8s_annotation、ch_pod_service_k8s_label |
工作负载 | pod_group、pod_group_port | ch_pod_group、ch_pod_group_port |
pod | pod | ch_pod、ch_pod_port、ch_pod_k8s_annotation、ch_pod_k8s_env、ch_pod_k8s_label |
replicaset / inplaceset | pod_rs | |
ingress | pod_ingress、pod_ingress_rule、pod_ingress_rule_backend | ch_pod_ingress |
区域、可用区、主机、vpc、子网 | az、region、vtap、ip_resource、vinterface、vinterface_ip、vl2、vl2_net、vm | ch_az、ch_region、ch_vtap、ch_vtap_port、ch_subnet、ch_device、ch_ip_relation |
顾名思义,ch 开头的表会不断的更新,而稳定最新的数据存在上面中间列的表中,中间列的表的共同特点是 id+名称+详细描述字段的组合。
同时,clickhouse 中的 flow_log、flow_metrics 库中分别存储了网络链路原始数据和网络性能指标数据,库里每张表都存储了资源数据 id 关联数据。
如下图 clickhouse 的 flow_log.l7_flow_log 表中 pod_ns_id_0 (源端所属 namespace)和 pod_ns_id_1 (对端所属 namespace) 就对应 flow_tag.pod_ns_map 表中的 id。flow_tag.pod_ns_map 表数据来自 mysql 数据库。
clickhouse 中的 flow_tag 库使用了 mysql 数据库表引擎远程关联 mysql 库中相关表,以下是 flow_tag.pod_ns_map 表 sql:
create dictionary flow_tag.pod_ns_map
(
`id` uint64,
`name` string,
`icon_id` int64
)
primary key id
source(mysql(port 30130 user 'root' password 'deepflow' replica (host 'deepflow-mysql' priority 1) db deepflow table ch_pod_ns invalidate_query 'select(select updated_at from ch_pod_ns order by updated_at desc limit 1) as updated_at'))
lifetime(min 0 max 60)
layout(flat());
对应的 mysql 数据库表内容中可以看到 namespace 的 id 为5的是 deepflow,和 clickhouse 的 flow_log.l7_flow_log 表中 pod_ns_id_0 和 pod_ns_id_1 字段相呼应:
由于改造内容很多,本地断点调试和测试尤为重要。要想在本地调试 server 代码而且只调试数据库部分,过程简单总结如下:
在和外网通畅的环境先尝试编译原版代码,保证代码没有问题
#go 版本要1.20以上
$ go version
go version go1.20.7 linux/amd64
#获取稳定版工程代码,该版本带有 cpu profling 观测能力
$ git clone https://github.com/deepflowio/deepflow.git
$ git checkout v6.3.9
$ git branch --show-current
v6.3.9
#进入 server 端代码目录
$ cd server
$ ls ./server
bin cmd common controller dockerfile go.mod go.sum ingester libs makefile patch querier readme.md server.yaml vendor
$ make clean
$ make vendor
$ cgo_enabled=0 goos=linux goarch=amd64 make server -e binary_suffix=.amd64
$ cgo_enabled=0 goos=linux goarch=arm64 make server -e binary_suffix=.arm64
#编译成功,在 bin 目录下出现两个二进制包
$ cd bin
$ ls
deepflow-server.amd64 deepflow-server.arm64
#编译成功,在 vendor 目录下有下好的包
$ cd vendor
$ ls
github.com go4.org golang.org google.golang.org go.opentelemetry.io gopkg.in gorm.io go.uber.org inet.af k8s.io modules.txt sigs.k8s.io skywalking.apache.org
#编译好后,打包镜像
$ docker build --build-arg targetarch=amd64 -t deepflow-server:amd-20230936 .
$ docker build --build-arg targetarch=arm64 -t deepflow-server:arm-20231015 .
$ docker save deepflow-server:amd-20230936 > deepflow-server-amd.20230936.tar
$ docker save deepflow-server:arm-20231015 > deepflow-server-arm.20231015.tar
镜像拿到后,替换原始镜像,和 deepflow agent 等模块调试测试下,没有问题后,进行下一步。
笔者用的是 goland 进行开发,下载上述 server 代码到本地,用 goland 打开,开始尝试在本地连接 mysql 数据库,跑起来。期间对代码进行删减注释,聚焦在数据库部分。
关闭 election 多 server 主从选举逻辑
关闭 ingester 调用链关联逻辑
去掉不支持 windows 环境的 syscall.sigstop
去掉不支持 windows 环境的 netlinks
从前面编译好的 vendor 目录里获取后期 go generator 生成的 go 文件填充到工程中,包括:
其他,还包括覆盖 clickhouse 驱动,涉及 conn_batch.go 文件等等。
修改 server.yaml 配置,连本地数据库
# mysql 相关配置
mysql:
enabled: true
database: deepflow
user-name: root
user-password: deepflow
host: localhost
port: 31234
。。。
上述操作做完,执行 main 函数,server 成功运行,mysql 数据库开始初始化,clickhouse 数据库开始初始化,观察数据正常生成后,可以开始 pg 改造了。
gorm.io/driver/mysql v1.3.4
//add
gorm.io/driver/postgres v1.3.4
gorm.io/driver/sqlite v1.3.4
gorm.io/gorm v1.23.5
# mysql 相关配置
mysql:
enabled: false
database: deepflow
user-name: root
user-password: deepflow
host: localhost
port: 31234
timeout: 30
# whether drop database when init failed
drop-database-enabled: false
auto_increment_increment: 1
# limit the total number of process queried at a time
result_set_max: 100000
# postgresql 相关配置
postgres:
enabled: true
database: postgres
user-name: postgres
user-password:
host: localhost
port: 5432
timeout: 30
# whether drop database when init failed
drop-database-enabled: false
auto_increment_increment: 1
# limit the total number of process queried at a time
result_set_max: 100000
/controller/controller.go
代码中添加数据源判断 //判断可用数据源的必要逻辑
if cfg.mysqlcfg.enabled && cfg.postgrescfg.enabled {
log.fatalf("postgres and mysql must have only one enabled. ")
} else if !cfg.mysqlcfg.enabled && !cfg.postgrescfg.enabled {
log.fatalf("postgres and mysql must have one enabled. ")
} else if cfg.mysqlcfg.enabled {
dbname = "mysql"
} else if cfg.postgrescfg.enabled {
dbname = "postgres"
}
//数据源选择后,初始化数据源
if cfg.mysqlcfg.enabled {
err := mysql.initmysql(cfg.mysqlcfg)
if err != nil {
log.errorf("init "+dbname+" failed: %s", err.error())
time.sleep(time.second)
os.exit(0)
}
} else if cfg.postgrescfg.enabled {
err := mysql.initpostgresql(cfg.postgrescfg)
if err != nil {
log.errorf("init "+dbname+" failed: %s", err.error())
time.sleep(time.second)
os.exit(0)
}
}
/controller/db/mysql/gorm.go
代码中://初始化 mysql
func initmysql(cfg mysqlconfig) error {
dbconfig = cfg
db = gorm(cfg)
if db == nil {
return errors.new("connect mysql failed")
}
var version string
err := db.raw(fmt.sprintf("select version from db_version")).scan(&version).error
if err != nil {
return errors.new("get current db version failed")
}
if version != migration.db_version_expected {
return errors.new(fmt.sprintf("current db version: %s != expected db version: %s", version, migration.db_version_expected))
}
return nil
}
//初始化 postgresql
func initpostgresql(cfg postgresqlconfig) error {
dbpsqlconfig = cfg
db = postgresqlgorm(cfg)
if db == nil {
return errors.new("connect postgresql failed")
}
var version string
err := db.raw(fmt.sprintf("select version from db_version")).scan(&version).error
if err != nil {
return errors.new("get current db version failed")
}
if version != migration.db_version_expected {
return errors.new(fmt.sprintf("current db version: %s != expected db version: %s", version, migration.db_version_expected))
}
return nil
}
//初始化 mysql 连接参数
func gorm(cfg mysqlconfig) *gorm.db {
dsn := getdsn(cfg, cfg.database, cfg.timeout, false)
return getgormdb(dsn)
}
//初始化 postgresql 连接参数
func postgresqlgorm(cfg postgresqlconfig) *gorm.db {
dsn := getpsqldsn(cfg, cfg.database, cfg.timeout, false)
return getpsqlgormdb(dsn)
}
/controller/db
文件夹中,需要注意的是,之前有尝试把 db/mysql 改成 db/dbsql
,但是发现涉及包引用路径的改动特别多,后来放弃改动,不影响使用。5.还有需要一个注意的点是由于 pg 数据库需要初始化 function、trigger 等需要用到管理员权限,另外考虑生产部署需要,postgresql 的初始化 sql 是手动执行的,你只要在建 db_version 表是直接插入当前版本号,server 端程序就不会自动执行初始化 sql。
create table if not exists db_version (
version varchar(64) primary key,
created_at timestamp not null default current_timestamp,
updated_at timestamp not null default current_timestamp
);
truncate table db_version;
insert into db_version (version) values('6.3.1.51');
datetime 转 timestamp
engine=innodb default charset=utf8 表声明需去掉
``上引号不能用
postgresql 数据库的 char 类型填充数据会保留空格,改成 varchar 则不会保留。
另外一个思路,在代码中加上 trim 函数去除空格,这次改造选择将类型改成 varchar 解决,后续可以优化。
create table if not exists db_version (
version varchar(64) primary key,
created_at timestamp not null default current_timestamp,
updated_at timestamp not null default current_timestamp
);
truncate table db_version;
longblob 改成 blob
longtext 改成 text
unsigned 绝对值不支持,需替换成 check 函数
integer unsigned -> integer check(netns_id>=0)
int(11) unsigned -> 删除
tinyint(1) unsigned -> check(create_method>=0)
int(10) unsigned -> check(policy_id>=0)
tinyint(3) unsigned -> check(app_label_column_index>=0)
id int(11) unsigned not null auto_increment, #自增序列,建议删除
tinyint(1) -> tinyint
tinyint(3) -> tinyint
primary key (id,domain),
和 id integer not null auto_increment,
冲突报错如下: error: incorrect table definition, auto_increment column must be defined as a key
解决办法:创建 sequence,id 字段添加 nextval 方法,以 vnet 表改造举例,
mysql 表语法:
create table if not exists vnet(
id integer not null auto_increment,
。。。
primary key (id,domain),
index state_server_index(state, gw_launch_server)
)engine=innodb default charset=utf8 auto_increment=256 /* reset in init_auto_increment */;
delete from vnet;
postgresql 表语法:
create sequence seq_vnet start 256; --创建 sequence
create table if not exists vnet(
id integer not null default nextval('seq_vnet'), --id 字段添加 nextval 方法
。。。
primary key (id,domain)
) /* reset in init_default nextval('seq_plugin') */;
create index vnet_index on vnet(state, gw_launch_server);
delete from vnet;
comment on column vnet.state is '0.temp 1.creating 2.created 3.exception 4.modifing 5.destroying 6.to run 7.running 8.to stop 9.stopped';
"1m" -> '1m'
epc " 1:route, 2:transparent" -> ' 1:route, 2:transparent'
int(10) -> integer
int(11) -> integer
int -> integer
double 要改成 number
float 要改成 number
enum 不支持,修改字段类型为 varchar,同时引入 check 方法
alarm_policy 表
--data_level enum('1s','1m')
data_level varchar(4) check (data_level in ('1s','1m'))
report_policy 表
--`interval` enum('1d','1h') not null default '1h',
interval varchar(4) check (interval in ('1d','1h')) not null default '1h',
mediumblob 转 blob
mail_server.user 和 go_genesis_process.user 是 pg 关键字 改成 user_name, go 代码的 model 也需要修改,同时 mysql 的 init sql 也需要修改
mysql 和 postgresql 同时修改字段名:
--user varchar(256) default '',
user_name varchar(256) default '',
go 代码改造
type genesisprocess struct {
id int `gorm:"primarykey;column:id;type:int;not null" json:"id"`
。。。
//user -> user_name
//update
user string `gorm:"column:user_name;type:varchar(256);default:null" json:"user_name"`
。。。
}
func (genesisprocess) tablename() string {
return "go_genesis_process"
}
mysql 和 postgresql 同时修改字段名:
--interval integer not null,
interval_time integer not null,
.\controller\db\mysql\model.go
代码改造
//interval int `gorm:"column:interval;type:int" json:"interval"`
//update
interval int `gorm:"column:interval_time;type:int" json:"interval_time"`
.\controller\http\service\data_source.go
代码改造
if name, ok := filter["name"]; ok {
interval := convertnametointerval(name.(string))
if interval != 0 {
//db = db.where("`interval` = ?", interval)
//lucheng update
db = db.where("interval_time = ?", interval)
}
}
on update current_timestamp
不支持,需在建表前添加 function 和 trigger。create or replace function "public"."cs_timestamp"()
returns "pg_catalog"."trigger" as $body$
begin
new.updated_at= current_timestamp;
return new;
end
$body$
language plpgsql volatile
cost 100
create trigger db_version_updated_at before update on db_version for each row execute procedure cs_timestamp();
create trigger plugin_updated_at before update on plugin for each row execute procedure cs_timestamp();
create trigger vtap_repo_updated_at before update on vtap_repo for each row execute procedure cs_timestamp();
。。。
如果 index 是复合索引,那么 references 外联了复合索引的一个字段,会报错:
primary key (id,domain)
references vl2(id),
将主键索引改成唯一索引
primary key (id,domain), -> create unique index vinterface_unique_index on vinterface(id,domain);
解决方案,默认插入语句直接写死 uuid,如下所示
insert into sys_configuration (param_name, value, comments, lcuuid) values ('cloud_sync_timer', '60', 'unit: s', 'ffffffff-ffff-ffff-ffff-fffffffffff1');
insert into sys_configuration (param_name, value, comments, lcuuid) values ('pcap_data_retention', '3', 'unit: day', 'ffffffff-ffff-ffff-ffff-fffffffffff2');
insert into sys_configuration (param_name, value, comments, lcuuid) values ('system_data_retention', '7', 'unit: day', 'ffffffff-ffff-ffff-ffff-fffffffffff3');
insert into sys_configuration (param_name, value, comments, lcuuid) values ('ntp_servers', '0.cn.pool.ntp.org', '', 'ffffffff-ffff-ffff-ffff-fffffffffff4');
go 代码插入的由 uuid 函数填充。
name varchar(256) character set utf8 collate utf8_bin not null default '',
报错:
type "pg_catalog.varchar_utf8" does not exist
删除即可
举例:.\controller\http\service\resource\domain.go
改造点:
//lucheng update
//err := mysql.db.clauses(clause.insert{modifier: "ignore"}).create(&domain).error
if strings.compare(mysql.db.name(), "postgres") == 0 {
err := mysql.db.debug().clauses(clause.onconflict{donothing: true}).create(&domain).error
if err != nil {
log.warningf("create domain failed: %s", err)
}
} else {
err := mysql.db.clauses(clause.insert{modifier: "ignore"}).create(&domain).error
if err != nil {
log.warningf("create domain failed: %s", err)
}
}
举例:.\controller\trisolaris\dbmgr\dbmgr.go
改造点:
举例:.\controller\trisolaris\vtap\vtap_discovery.go
改造点:
举例:.\controller\tagrecorder\dictionary.go
改造点:
// 在本区域所有数据节点更新字典
// update the dictionary at all data nodes in the region
host := ""
if c.cfg.mysqlcfg.enabled {
host = c.cfg.mysqlcfg.host
} else if c.cfg.postgrescfg.enabled {
host = c.cfg.postgrescfg.host
}
replicasql := fmt.sprintf("replica (host '%s' priority %s)", host, "1")
connect, err := clickhouse.connect(clickhousecfg)
if err != nil {
continue
}
log.infof("refresh clickhouse dictionary in (%s: %d)", address.ip, clickhousecfg.port)
举例:.\controller\tagrecorder\const.go
改造点,创建k8s_envs
等 dictionary 字典表:
create_k8s_envs_dictionary_pgsql = "create dictionary %s.%s\n" +
"(\n" +
" `id` uint64,\n" +
" `envs` string,\n" +
" `l3_epc_id` uint64,\n" +
" `pod_ns_id` uint64\n" +
")\n" +
"primary key id\n" +
"source(postgresql(port %s user '%s' password '%s' %s db %s table %s invalidate_query 'select(select updated_at from %s order by updated_at desc limit 1) as updated_at'))\n" +
"lifetime(min 0 max %d)\n" +
"layout(flat())"
.\controller\db\mysql\common\utils.go
中配置日志输出级别为 debug:func getpsqlgormdb(dsn string) *gorm.db {
db, err := gorm.open(postgres.new(postgres.config{
//dsn: "host=192.168.10.10 user=deepflow dbname=db_deepflow password=gauss123 port=5432 sslmode=disable",
dsn: dsn,
prefersimpleprotocol: true,
}), &gorm.config{
namingstrategy: schema.namingstrategy{singulartable: true}, // 设置全局表名禁用复数
logger: logger.new(
l.new(os.stdout, "\r\n", l.lstdflags), // io writer
logger.config{
slowthreshold: 0, // 慢 sql 阈值,为0时不打印
//调试的时候改成 debug
loglevel: logger.error, // log level
ignorerecordnotfounderror: false, // 忽略 errrecordnotfound(记录未找到)错误
colorful: true, // 是否彩色打印
}), // 配置 log
})
if err != nil {
log.errorf("postgresql connection failed with error: %v", err.error())
return nil
}
sqldb, _ := db.db()
// 限制最大空闲连接数、最大连接数和连接的生命周期
sqldb.setmaxidleconns(50)
sqldb.setmaxopenconns(100)
sqldb.setconnmaxlifetime(time.hour)
return db
}
server.yaml 中配置修改 error->debug:
# logfile path
log-file: /var/log/deepflow/server.log
# loglevel: "debug/info/warn/error"
log-level: debug
举例:
举例:.\controller\recorder\recorder.go
改造点:
func (r *recorder) updatestateinfo(clouddata cloudmodel.resource) {
...
//log.debugf("update domain (%+v)", domain)
log.debugf("updatestateinfo domain (%+v)", domain)
...
举例:.\controller\monitor\analyzer.go
改造点:
// 获取可用区中的数据节点 ip
err := mysql.db.where("region in (?)", regionlcuuids.toslice()).find(&azanalyzerconns).error
//lucheng add
if err != nil {
log.error(err)
}
应公司要求需要将镜像底包替换成国产化操作系统,基于 pg 代码进行编译打包镜像 dockerfile 需要修改:
from euler:22.03 as deepflow-server #openeuler 镜像底包
maintainer lucheng
#run apk add tzdata
copy ./server.yaml /etc/
run mkdir /etc/mysql
copy ./controller/db/mysql/migration/rawsql /etc/mysql
run mkdir /etc/pgsql #虽然新增 pgsql 文件夹,但这里并没用到自动初始化 sql,后续还需要适配调试下。原因是有些 pg 数据库的操作需要用到管理员账户,比如创建用户、赋权、function 等,后续可以考虑在发布 pg 镜像时直接加上这部分。
copy ./controller/db/mysql/migration/pgsql /etc/pgsql
copy ./controller/cloud/filereader/manual_data_samples.yaml /etc/
copy ./querier/db_descriptions /etc/db_descriptions/
arg targetarch
copy ./bin/deepflow-server.${targetarch}.pg /bin/deepflow-server
cmd /bin/deepflow-server
#编译好后,打包镜像
$ docker build --build-arg targetarch=amd64.pg -t deepflow-server-pg:amd-20230936 .
$ docker build --build-arg targetarch=arm64.pg -t deepflow-server-pg:arm-20231015 .
$ docker save deepflow-server-pg:amd-20230936 > deepflow-server-pg-amd.20230936.tar
$ docker save deepflow-server-pg:arm-20231015 > deepflow-server-pg-arm.20231015.tar
上传到测试环境和 deepflow 的 grafana、deepflow-app、deepflow-agent 联调测试,经过多轮测试验证证明此次改造成功!目前内部已发版上线。
deepflow server 使用 go 代码编写,用到 grpc、protobuf、协程等提高性能手段,整体性能表现非常优秀。
同时开发改造体验也很好,使用的是 gin、gorm 等都是主流开发框架上手容易文档丰富,而且 go 代码编译 x86 和 arm 两种环境的包只要在 x86 环境一次性编译好,不需要再去 arm 环境再折腾一次。
通过让deepflow server 支持 postgresql 数据源能让用户拥有更多选择,好处显而易见。
deepflow 是云杉网络开发的一款可观测性产品,旨在为复杂的云基础设施及云原生应用提供深度可观测性。deepflow 基于 ebpf 实现了应用性能指标、分布式追踪、持续性能剖析等观测信号的零侵扰(zero code
)采集,并结合智能标签(smartencoding
)技术实现了所有观测信号的全栈(full stack
)关联和高效存取。使用 deepflow,可以让云原生应用自动具有深度可观测性,从而消除开发者不断插桩的沉重负担,并为 devops/sre 团队提供从代码到基础设施的监控及诊断能力。
github 地址:https://github.com/deepflowio/deepflow
访问 deepflow demo,体验零插桩、全覆盖、全关联的可观测性。
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论