重点来了,JobRunner对象是有了,下面就真的要生成一个Job了,如何生成?本节揭秘!
===========================================================================================
String jobType = jobProps.getString("type");
Class<? extends Object> executorClass = pluginSet.getPluginClass(jobType);
不用解释了吧,根据工作类型,查找对应的类
类怎么查找的?
private void loadDefaultTypes(JobTypePluginSet plugins) throws JobTypeManagerException {
logger.info("Loading plugin default job types");
plugins.addPluginClass("command", ProcessJob.class);
plugins.addPluginClass("javaprocess", JavaProcessJob.class);
plugins.addPluginClass("noop", NoopJob.class);
plugins.addPluginClass("python", PythonJob.class);
plugins.addPluginClass("ruby", RubyJob.class);
plugins.addPluginClass("script", ScriptJob.class);
}
这下知道了吧,本身内嵌了以上6种工作类型
===========================================================================================
jobProps = PropsUtils.resolveProps(jobProps);
貌似是特殊变量的替换
===========================================================================================
最后
job = (Job) Utils.callConstructor(executorClass, jobId, pluginLoadProps, jobProps, logger);
这是啥意思?
stop in azkaban.utils.Utils.callConstructor
通过构造函数,生成对应类型的实例!
===========================================================================================
最后就是runJob了,代码如下:
private void runJob() {
// 看到这里了
try {
//开始跑job
job.run();
} catch (Throwable e) {
if (props.getBoolean("job.succeed.on.failure", false)) {
changeStatus(Status.FAILED_SUCCEEDED);
logError("Job run failed, but will treat it like success.");
logError(e.getMessage() + " cause: " + e.getCause(), e);
} else {
changeStatus(Status.FAILED);
logError("Job run failed!", e);
logError(e.getMessage() + " cause: " + e.getCause());
}
}
if (job != null) {
node.setOutputProps(job.getJobGeneratedProperties());
}
// If the job is still running, set the status to Success.
if (!Status.isStatusFinished(node.getStatus())) {
changeStatus(Status.SUCCEEDED);
}
}
具体怎么做的呢?
stop in azkaban.execapp.JobRunner.runJob
===========================================================================================
因为这个job是具体的任务,6大任务之一,先研究下command类型任务的过程!
stop in azkaban.jobExecutor.ProcessJob.run
===========================================================================================
protected List<String> getCommandList() {
List<String> commands = new ArrayList<String>();
commands.add(jobProps.getString(COMMAND));
for (int i = 1; jobProps.containsKey(COMMAND + "." + i); i++) {
commands.add(jobProps.getString(COMMAND + "." + i));
}
return commands;
}
看来可以加多个command,格式不解释了
command的执行不多说了,最后来看看成功的消息通知给listeners的过程
===========================================================================================
fireEvent(Event.create(this, Type.JOB_FINISHED), false);
之前知道有3个listener,看看做了哪些事情!
[azkaban.execapp.event.JobCallbackManager@3ba30a43,
azkaban.execapp.FlowRunner$JobRunnerEventListener@da0e592,
azkaban.execapp.jmx.JmxJobMBeanManager@53976f5c]
先看第1个listener JobCallbackManager
执行函数:略
第2个:
FlowRunner$JobRunnerEventListener
里面比较重要的就是一行:finishedNodes.add(node);
然后又触发listener:FlowRunnerManager
第3个:JmxJobMBeanManager
最后还得上传执行日志:
final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs "
+ "(exec_id, name, attempt, enc_type, start_byte, end_byte, "
+ "log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
之前说的一个job结束后,如何推动下一个节点运行呢?
flowRunner是在系统线程池里的,
while (!flowFinished) {
synchronized (mainSyncObj) {
if (flowPaused) {
try {
mainSyncObj.wait(CHECK_WAIT_MS);
} catch (InterruptedException e) {
}
continue;
} else {
if (retryFailedJobs) {
retryAllFailures();
}
else if (!progressGraph()) { //看这里
try {
mainSyncObj.wait(CHECK_WAIT_MS);
} catch (InterruptedException e) {
}
}
}
}
}
这个放在下一节讲解!