168人参与 • 2024-08-04 • 大数据
因为在工作中需要推动apache dolphinscheduler的升级,经过预研,从1.3.4到3.1.2有的体验了很大的提升,在性能和功能性有了很多的改善,推荐升级。
查看官方的升级文档,可知有提供升级脚本,如果只是跨小版本的更新那么只用执行脚本就好了,但跨多个大版本升级时依然容易出现各种问题,特此总结。
>旧版本:1.3.4 <br>新版本:3.1.2
升级完成后使用资源中心报错 illegalargumentexception: failed to specify server's kerberos principal name
资源中心使用的hdfs,开启了kerberos
认证
解决方法:
编辑 dolphinscheduler/api-server/conf/hdfs-site.xml
添加以下内容
<property>
<name>dfs.namenode.kerberos.principal.pattern</name>
<value>*</value>
</property>
升级完成后查看任务实例的日志,报错未找到日志,查看报错信息,检查新版本的目录结构和表里的日志路径,发现原因是新版本的日志路径有变更。
升级前的日志路径在 /logs/
下。
升级后的日志路径在 /worker-server/logs/
下。
因此需要修改这里的目录
解决方法: 执行sql修改日志路径
update t_ds_task_instance set log_path=replace(log_path,'/logs/','/worker-server/logs/');
然后将原日志文件copy到新的日志路径
cp -r {旧版本dolphinscheduler目录}/logs/[1-9]* {新版本dolphinscheduler目录}/worker-server/logs/*
查看报错信息,原因是 t_ds_process_definition_log
和 t_ds_process_definition
主键的初始值不一致,那么修改成一致的就好了!
解决方法: 执行sql
# 查出主键自增值
select auto_increment from information_schema.tables where table_schema = 'dolphinscheduler' and table_name = 't_ds_process_definition' limit 1
# 将上面sql的执行结果填写到下方参数处执行
alter table dolphinscheduler_bak1.t_ds_process_definition_log auto_increment = {max_id};
检查查询的sql
在 dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/taskinstancemapper.xml
文件里,select id="querytaskinstancelistpaging"
的sql
select
<include refid="basesqlv2">
<property name="alias" value="instance" />
</include>
,
process.name as process_instance_name
from t_ds_task_instance instance
left join t_ds_task_definition_log define on define.code=instance.task_code and define.version=instance.task_definition_version
left join t_ds_process_instance process on process.id=instance.process_instance_id
where define.project_code = #{projectcode}
<if test="starttime != null">
and instance.start_time <![cdata[ >=]]> #{starttime}
</if>
......省略多余部分
查询任务实例列表的sql会关联 t_ds_task_definition_log
表,经检查发现是 define.code=instance.task_code
这一句关联不上。
结合下面的查询条件 define.project_code = #{projectcode}
可知,关联 t_ds_task_definition_log
主要是为了过滤 projectcode
,那么来修改下这个sql:
解决方法:
select
<include refid="basesqlv2">
<property name="alias" value="instance" />
</include>
,
process.name as process_instance_name
from t_ds_task_instance instance
-- left join t_ds_task_definition_log define
-- on define.code=instance.task_code and
-- define.version=instance.task_definition_version
join t_ds_process_instance process
on process.id=instance.process_instance_id
join t_ds_process_definition define
on define.code=process.process_definition_code
where define.project_code = #{projectcode}
<if test="starttime != null">
and instance.start_time <![cdata[ >=]]> #{starttime}
</if>
......省略多余部分
直接用 t_ds_process_definition
关联,也有 project_code
字段可以用来关联过滤,这里修改后就能查出数据了。
查看代码
513 if (task_type_sub_process.equals(tasktype)) {
514 jsonnode jsonnodedefinitionid = param.get("processdefinitionid");
515 if (jsonnodedefinitionid != null) {
516 param.put("processdefinitioncode",
517 processdefinitionmap.get(jsonnodedefinitionid.asint()).getcode());
518 param.remove("processdefinitionid");
519 }
520 }
很明显是 processdefinitionmap.get(jsonnodedefinitionid.asint())
返回了null,加个null判断,如果返回null直接跳过,并将相关信息打印出来,升级结束后可以根据日志核对。
解决方法:
修改后:
if (jsonnodedefinitionid != null) {
if (processdefinitionmap.get(jsonnodedefinitionid.asint()) != null) {
param.put("processdefinitioncode",processdefinitionmap.get(jsonnodedefinitionid.asint()).getcode());
param.remove("processdefinitionid");
} else {
logger.error("*******************error");
logger.error("*******************param:" + param);
logger.error("*******************jsonnodedefinitionid:" + jsonnodedefinitionid);
}
}
查看代码
669 if (mapentry.ispresent()) {
670 map.entry<long, map<string, long>> processcodetasknamecodeentry = mapentry.get();
671 dependitem.put("definitioncode", processcodetasknamecodeentry.getkey());
672 string deptasks = dependitem.get("deptasks").astext();
673 long taskcode =
674 "all".equals(deptasks) || processcodetasknamecodeentry.getvalue() == null ? 0l
675 : processcodetasknamecodeentry.getvalue().get(deptasks);
676 dependitem.put("deptaskcode", taskcode);
677 }
很明显是 processcodetasknamecodeentry.getvalue().get(deptasks)
返回了null,修改下逻辑,不为null才赋值并打印相关日志。
解决方法:
修改后:
long taskcode =0;
if (processcodetasknamecodeentry.getvalue() != null
&&processcodetasknamecodeentry.getvalue().get(deptasks)!=null){
taskcode =processcodetasknamecodeentry.getvalue().get(deptasks);
}else{
logger.error("******************** deptasks:"+deptasks);
logger.error("******************** taskcode not in "+jsonutils.tojsonstring(processcodetasknamecodeentry));
}
dependitem.put("deptaskcode", taskcode);
可在 api-server/conf/application.yaml
配置接入ldap
security:
authentication:
# authentication types (supported types: password,ldap)
type: ldap
# if you set type `ldap`, below config will be effective
ldap:
# ldap server config
urls: xxx
base-dn: xxx
username: xxx
password: xxx
user:
# admin userid when you use ldap login
admin: xxx
identity-attribute: xxx
email-attribute: xxx
# action when ldap user is not exist (supported types: create,deny)
not-exist-action: create
要成功接入ldap至少需要urls,base-dn,username,password,identity
和email正确填写,不知道email字段名可以按下面的方式处理,email先空着
启动服务后用ldap用户登录
解决办法: ldap 认证的代码在 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/security/impl/ldap/ldapservice.java
的 ldaplogin()
ctx = new initialldapcontext(searchenv, null);
searchcontrols sc = new searchcontrols();
sc.setreturningattributes(new string[]{ldapemailattribute});
sc.setsearchscope(searchcontrols.subtree_scope);
equalsfilter filter = new equalsfilter(ldapuseridentifyingattribute, userid);
namingenumeration<searchresult> results = ctx.search(ldapbasedn, filter.tostring(), sc);
if (results.hasmore()) {
// get the users dn (distinguishedname) from the result
searchresult result = results.next();
namingenumeration<!--? extends attribute--> attrs = result.getattributes().getall();
while (attrs.hasmore()) {
// open another connection to the ldap server with the found dn and the password
searchenv.put(context.security_principal, result.getnameinnamespace());
searchenv.put(context.security_credentials, userpwd);
try {
new initialdircontext(searchenv);
} catch (exception e) {
logger.warn("invalid ldap credentials or ldap search error", e);
return null;
}
attribute attr = attrs.next();
if (attr.getid().equals(ldapemailattribute)) {
return (string) attr.get();
}
}
}
第三行会根据填的字段过滤,先注释第三行
// sc.setreturningattributes(new string[]{ldapemailattribute});
重新执行后第10行会返回全部字段
namingenumeration<!--? extends attribute--> attrs = result.getattributes().getall();
通过打印或调试在里面找到email字段填到配置文件里,再还原上面注释的代码,重启服务后即可正常接入ldap登录。
经多次测试,发现普通用户只能看到所属用户为自己的资源文件,管理员授权后依然无法查看资源文件
解决办法:
文件 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/permission/resourcepermissioncheckserviceimpl.java
的 listauthorizedresource()
方法,将 return 的集合修改为 relationresources
@override
public set<integer> listauthorizedresource(int userid, logger logger) {
list<resource> relationresources;
if (userid == 0) {
relationresources = new arraylist<>();
} else {
// query resource relation
list<integer> resids = resourceusermapper.queryresourcesidlistbyuseridandperm(userid, 0);
relationresources = collectionutils.isempty(resids) ? new arraylist<>() : resourcemapper.queryresourcelistbyid(resids);
}
list<resource> ownresourcelist = resourcemapper.queryresourcelistauthored(userid, -1);
relationresources.addall(ownresourcelist);
return relationresources.stream().map(resource::getid).collect(toset()); // 解决资源文件授权无效的问题
// return ownresourcelist.stream().map(resource::getid).collect(toset());
}
检查新版本的 change log ,发现在3.1.3版本修复了这个bug
https://github.com/apache/dolphinscheduler/pull/13318
因为kerberos配置了票据过期时间,一段时间后资源中心的hdfs资源将无法访问,最好的解决办法是添加定时更新凭证的相关逻辑。
解决办法:
在文件 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/commonutils.java
添加方法
/**
* * 定时更新凭证
*/
private static void startcheckkeytabtgtandreloginjob() {
// 每天循环,定时更新凭证
executors.newscheduledthreadpool(1).schedulewithfixeddelay(() -> {
try {
usergroupinformation.getloginuser().checktgtandreloginfromkeytab();
logger.warn("check kerberos tgt and relogin from keytab finish.");
} catch (ioexception e) {
logger.error("check kerberos tgt and relogin from keytab error", e);
}
}, 0, 1, timeunit.days);
logger.info("start check keytab tgt and relogin job success.");
}
然后在该文件的 loadkerberosconf
方法返回 true
前调用:
public static boolean loadkerberosconf(string javasecuritykrb5conf, string loginuserkeytabusername,
string loginuserkeytabpath, configuration configuration) throws ioexception {
if (commonutils.getkerberosstartupstate()) {
system.setproperty(constants.java_security_krb5_conf, stringutils.defaultifblank(javasecuritykrb5conf,
propertyutils.getstring(constants.java_security_krb5_conf_path)));
configuration.set(constants.hadoop_security_authentication, constants.kerberos);
usergroupinformation.setconfiguration(configuration);
usergroupinformation.loginuserfromkeytab(
stringutils.defaultifblank(loginuserkeytabusername,
propertyutils.getstring(constants.login_user_key_tab_username)),
stringutils.defaultifblank(loginuserkeytabpath,
propertyutils.getstring(constants.login_user_key_tab_path)));
startcheckkeytabtgtandreloginjob(); // 此处调用
return true;
}
return false;
}
这篇文章主要是记录升级过程中遇到的问题,希望能够对大家有所帮助!
> 本文由 白鲸开源科技 提供发布支持!</resource></integer></resource></integer></searchresult></long,>
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论