博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Azkaban的Exec Server分析 30:JobRunner如何构造一个Job大揭秘
阅读量:7091 次
发布时间:2019-06-28

本文共 3705 字,大约阅读时间需要 12 分钟。

hot3.png

重点来了,JobRunner对象是有了,下面就真的要生成一个Job了,如何生成?本节揭秘!

===========================================================================================

String jobType = jobProps.getString("type");

Class<? extends Object> executorClass = pluginSet.getPluginClass(jobType);

不用解释了吧,根据工作类型,查找对应的类

类怎么查找的?

private void loadDefaultTypes(JobTypePluginSet plugins) throws JobTypeManagerException {

logger.info("Loading plugin default job types");

plugins.addPluginClass("command", ProcessJob.class);

plugins.addPluginClass("javaprocess", JavaProcessJob.class);

plugins.addPluginClass("noop", NoopJob.class);

plugins.addPluginClass("python", PythonJob.class);

plugins.addPluginClass("ruby", RubyJob.class);

plugins.addPluginClass("script", ScriptJob.class);

}

这下知道了吧,本身内嵌了以上6种工作类型

===========================================================================================

jobProps = PropsUtils.resolveProps(jobProps);

貌似是特殊变量的替换

===========================================================================================

最后

job = (Job) Utils.callConstructor(executorClass, jobId, pluginLoadProps, jobProps, logger);

这是啥意思?

stop in azkaban.utils.Utils.callConstructor

通过构造函数,生成对应类型的实例!

===========================================================================================

最后就是runJob了,代码如下:

private void runJob() {

// 看到这里了

try {

//开始跑job

job.run();

} catch (Throwable e) {

 

if (props.getBoolean("job.succeed.on.failure", false)) {

changeStatus(Status.FAILED_SUCCEEDED);

logError("Job run failed, but will treat it like success.");

logError(e.getMessage() + " cause: " + e.getCause(), e);

} else {

changeStatus(Status.FAILED);

logError("Job run failed!", e);

logError(e.getMessage() + " cause: " + e.getCause());

}

}

 

if (job != null) {

node.setOutputProps(job.getJobGeneratedProperties());

}

 

// If the job is still running, set the status to Success.

if (!Status.isStatusFinished(node.getStatus())) {

changeStatus(Status.SUCCEEDED);

}

}

具体怎么做的呢?

stop in  azkaban.execapp.JobRunner.runJob

===========================================================================================

因为这个job是具体的任务,6大任务之一,先研究下command类型任务的过程!

stop in azkaban.jobExecutor.ProcessJob.run

===========================================================================================

protected List<String> getCommandList() {

List<String> commands = new ArrayList<String>();

commands.add(jobProps.getString(COMMAND));

for (int i = 1; jobProps.containsKey(COMMAND + "." + i); i++) {

commands.add(jobProps.getString(COMMAND + "." + i));

}

 

return commands;

}

看来可以加多个command,格式不解释了

command的执行不多说了,最后来看看成功的消息通知给listeners的过程

===========================================================================================

fireEvent(Event.create(this, Type.JOB_FINISHED), false);

之前知道有3个listener,看看做了哪些事情!

[azkaban.execapp.event.JobCallbackManager@3ba30a43, 

azkaban.execapp.FlowRunner$JobRunnerEventListener@da0e592, 

azkaban.execapp.jmx.JmxJobMBeanManager@53976f5c]

先看第1个listener JobCallbackManager

执行函数:略

第2个:

FlowRunner$JobRunnerEventListener

里面比较重要的就是一行:finishedNodes.add(node);

然后又触发listener:FlowRunnerManager

第3个:JmxJobMBeanManager

最后还得上传执行日志:

final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs "

+ "(exec_id, name, attempt, enc_type, start_byte, end_byte, "

+ "log, upload_time) VALUES (?,?,?,?,?,?,?,?)";

之前说的一个job结束后,如何推动下一个节点运行呢?

flowRunner是在系统线程池里的,

while (!flowFinished) {

synchronized (mainSyncObj) {

if (flowPaused) {

try {

mainSyncObj.wait(CHECK_WAIT_MS);

} catch (InterruptedException e) {

}

 

continue;

} else {

if (retryFailedJobs) {

retryAllFailures();

}

else if (!progressGraph()) {

//看这里

try {

mainSyncObj.wait(CHECK_WAIT_MS);

} catch (InterruptedException e) {

}

}

}

}

}

这个放在下一节讲解!

转载于:https://my.oschina.net/qiangzigege/blog/656981

你可能感兴趣的文章
【原】iOS学习之图片拉伸处理(类似qq的气泡)
查看>>
postman抓包
查看>>
最大稳定极值区域(MSER)检测
查看>>
如何解决CorelDRAW中尖突问题
查看>>
Javascript实现Web颜色值转换
查看>>
拼接日期填写表单
查看>>
工控系统安全问题汇总(一)
查看>>
4、SpringBoot------邮件发送(2)
查看>>
java创建二叉树并递归遍历二叉树
查看>>
JSON必知必会
查看>>
安全站点导航
查看>>
Oracle Job
查看>>
收集一些有意思的ASCII程序注释(持续收集中,希望大家踊跃贡献)
查看>>
做网站的各种推荐网
查看>>
leetcode 10. 正则表达式匹配
查看>>
JM8.6中帧内帧间模式的选择
查看>>
测试覆盖率工具:EclEmma
查看>>
《CLR via C#》读书笔记 之 基元类型、引用类型和值类型
查看>>
BOS常用代码说明
查看>>
第111天:Ajax之jQuery实现方法
查看>>