165人参与 • 2024-08-04 • PostgreSQL
最近和朋友讨论oracle行级安全策略(vpd)时,查看了下官方文档,看起来vpd的原理是针对应用了oracle行级安全策略的表、视图或同义词发出的 sql 语句动态添加where子句。通俗理解就是将行级安全策略动态添加为where 条件。那么pg中的行级安全策略是怎么处理的呢?
行级安全策略(row level security)是更细粒度的数据安全控制策略。行级策略可以根据每个用户限制哪些行可以通过常规查询返回,哪些行可以通过数据修改命令插入、更新或删除。默认情况下,表没有任何行级安全策略,因此如果用户根据 sql 权限系统具有表的访问权限,则其中的所有行都可以平等地用于查询或更新。
在pg中我们可以创建行级策略,在sql执行时行级策略表达式将作为查询的一部分运行。
https://www.postgresql.org/docs/16/ddl-rowsecurity.html
创建3个用户
postgres=# create user admin;
create role
postgres=# create user peter;
create role
postgres=# create user bob;
create role
创建一个rlsdb数据库
postgres=# create database rlsdb owner admin;
create database
在rlsdb中使用admin用户创建表employee,并插入3个用户对应的数据
postgres=# \c rlsdb admin
you are now connected to database "rlsdb" as user "admin".
rlsdb=> create table employee ( empno int, ename text, address text, salary int, account_number text );
create table
rlsdb=> insert into employee values (1, 'admin', '2 down str', 80000, 'no0001' );
insert 0 1
rlsdb=> insert into employee values (2, 'peter', '132 south avn', 60000, 'no0002' );
insert 0 1
rlsdb=> insert into employee values (3, 'bob', 'down st 17th', 60000, 'no0003' );
insert 0 1
rlsdb=>
授权后,三个用户都能看到employee表的所有数据
rlsdb=> grant select on table employee to peter;
grant
rlsdb=> grant select on table employee to bob;
grant
rlsdb=> select * from employee;
empno | ename | address | salary | account_number
-------+-------+---------------+--------+----------------
1 | admin | 2 down str | 80000 | no0001
2 | peter | 132 south avn | 60000 | no0002
3 | bob | down st 17th | 60000 | no0003
(3 rows)
rlsdb=>
rlsdb=> \c rlsdb peter
you are now connected to database "rlsdb" as user "peter".
rlsdb=> select * from employee;
empno | ename | address | salary | account_number
-------+-------+---------------+--------+----------------
1 | admin | 2 down str | 80000 | no0001
2 | peter | 132 south avn | 60000 | no0002
3 | bob | down st 17th | 60000 | no0003
(3 rows)
rlsdb=> \c rlsdb bob
you are now connected to database "rlsdb" as user "bob".
rlsdb=> select * from employee;
empno | ename | address | salary | account_number
-------+-------+---------------+--------+----------------
1 | admin | 2 down str | 80000 | no0001
2 | peter | 132 south avn | 60000 | no0002
3 | bob | down st 17th | 60000 | no0003
(3 rows)
使用admin用户创建行级安全策略,对于peter和bob就只能看到自己的数据了。
rlsdb=> \c rlsdb admin
you are now connected to database "rlsdb" as user "admin".
rlsdb=> create policy emp_rls_policy on employee for all to public using (ename=current_user);
create policy
rlsdb=> alter table employee enable row level security;
alter table
rlsdb=> \c rlsdb peter
you are now connected to database "rlsdb" as user "peter".
rlsdb=> select * from employee;
empno | ename | address | salary | account_number
-------+-------+---------------+--------+----------------
2 | peter | 132 south avn | 60000 | no0002
(1 row)
rlsdb=> \c rlsdb bob
you are now connected to database "rlsdb" as user "bob".
rlsdb=> select * from employee;
empno | ename | address | salary | account_number
-------+-------+--------------+--------+----------------
3 | bob | down st 17th | 60000 | no0003
(1 row)
rlsdb=>
先看下行级安全策略在数据库中的呈现是什么样的。
查看pg_policy表,可以看到我们创建的emp_rls_policy这个策略,具体的策略polqual是一串字符,熟悉parsetree结构的朋友能关注到这是一个opexpr node。我们常见的where 条件也是类似的结构。
我们可以使用函数让polqual以更适合人阅读的方式来展示。
创建策略时,其实是将策略转换为where子句存到pg_policy表中。
objectaddress
createpolicy(createpolicystmt *stmt)
{
/*省略部分代码行*/
/*将策略转化为where子句*/
qual = transformwhereclause(qual_pstate,
stmt->qual,
expr_kind_policy,
"policy");
with_check_qual = transformwhereclause(with_check_pstate,
stmt->with_check,
expr_kind_policy,
"policy");
/* fix up collation information */
assign_expr_collations(qual_pstate, qual);
assign_expr_collations(with_check_pstate, with_check_qual);
/* 将转换后的子句写入pg_policy*/
/* open pg_policy catalog */
pg_policy_rel = table_open(policyrelationid, rowexclusivelock);
/* set key - policy's relation id. */
scankeyinit(&skey[0],
anum_pg_policy_polrelid,
btequalstrategynumber, f_oideq,
objectidgetdatum(table_id));
/* set key - policy's name. */
scankeyinit(&skey[1],
anum_pg_policy_polname,
btequalstrategynumber, f_nameeq,
cstringgetdatum(stmt->policy_name));
sscan = systable_beginscan(pg_policy_rel,
policypolrelidpolnameindexid, true, null, 2,
skey);
policy_tuple = systable_getnext(sscan);
/* complain if the policy name already exists for the table */
if (heaptupleisvalid(policy_tuple))
ereport(error,
(errcode(errcode_duplicate_object),
errmsg("policy \"%s\" for table \"%s\" already exists",
stmt->policy_name, relationgetrelationname(target_table))));
policy_id = getnewoidwithindex(pg_policy_rel, policyoidindexid,
anum_pg_policy_oid);
values[anum_pg_policy_oid - 1] = objectidgetdatum(policy_id);
values[anum_pg_policy_polrelid - 1] = objectidgetdatum(table_id);
values[anum_pg_policy_polname - 1] = directfunctioncall1(namein,
cstringgetdatum(stmt->policy_name));
values[anum_pg_policy_polcmd - 1] = chargetdatum(polcmd);
values[anum_pg_policy_polpermissive - 1] = boolgetdatum(stmt->permissive);
values[anum_pg_policy_polroles - 1] = pointergetdatum(role_ids);
/* add qual if present. */
if (qual)
values[anum_pg_policy_polqual - 1] = cstringgettextdatum(nodetostring(qual));
else
isnull[anum_pg_policy_polqual - 1] = true;
/* add with check qual if present */
if (with_check_qual)
values[anum_pg_policy_polwithcheck - 1] = cstringgettextdatum(nodetostring(with_check_qual));
else
isnull[anum_pg_policy_polwithcheck - 1] = true;
policy_tuple = heap_form_tuple(relationgetdescr(pg_policy_rel), values,
isnull);
catalogtupleinsert(pg_policy_rel, policy_tuple);
/* record dependencies */
target.classid = relationrelationid;
target.objectid = table_id;
target.objectsubid = 0;
myself.classid = policyrelationid;
myself.objectid = policy_id;
myself.objectsubid = 0;
recorddependencyon(&myself, &target, dependency_auto);
recorddependencyonexpr(&myself, qual, qual_pstate->p_rtable,
dependency_normal);
recorddependencyonexpr(&myself, with_check_qual,
with_check_pstate->p_rtable, dependency_normal);
/* register role dependencies */
target.classid = authidrelationid;
target.objectsubid = 0;
for (i = 0; i < nitems; i++)
{
target.objectid = datumgetobjectid(role_oids[i]);
/* no dependency if public */
if (target.objectid != acl_id_public)
recordshareddependencyon(&myself, &target,
shared_dependency_policy);
}
invokeobjectpostcreatehook(policyrelationid, policy_id, 0);
/* invalidate relation cache */
cacheinvalidaterelcache(target_table);
/* clean up. */
heap_freetuple(policy_tuple);
free_parsestate(qual_pstate);
free_parsestate(with_check_pstate);
systable_endscan(sscan);
relation_close(target_table, nolock);
table_close(pg_policy_rel, rowexclusivelock);
return myself;
}
在sql执行时,查询重写阶段会将对应的安全策略拼接到parsetree里,最后生成执行计划去执行。
从执行计划来看sql没有where条件,但是执行计划中存在 filter: (ename = current_user),证明了这个过程。
rlsdb=> explain analyze select * from employee ;
query plan
-----------------------------------------------------------------------------------------------------
seq scan on employee (cost=0.00..19.15 rows=3 width=104) (actual time=0.010..0.012 rows=1 loops=1)
filter: (ename = current_user)
rows removed by filter: 2
planning time: 0.416 ms
execution time: 0.036 ms
(5 rows)
rlsdb=>
再debug验证下这个过程。
给firerirrules函数设置断点,进入断点后从stack可以看到目前是在queryrewrite阶段,结合一些规则进行查询重写。
观察这个时候的parsetree,可以看到还没有将安全策略对应的opexpr拼接进来。
等执行到get_row_security_policies函数已获取到表对应安全策略securityquals。
打印securityquals可以看到和我们查询pg_policy中的opexpr是一致的。
接着将securityquals加入到rte的list中,这样我们再去打印parsetree就可以看到安全策略securityquals对应的opexpr已经被拼接进来。
然后就是去生成执行计划并执行。
pg的rls也是将对应的策略动态转换为where子句,在查询重写阶段将安全策略拼接到parsetree,生成执行计划去执行。
行级安全策略,可以提供更精细粒度的表数据权限管理,在一定的场景下,比如只让用户看到自己对应的数据,能做到更安全的权限把控。
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论