[TOC]
项目名称:增加跨工作流的参数传递功能
项目主导师: 鲍亮
申请人:孙浩博
申请日期:2023/6/4
邮箱:sunhaobo@stu.xidian.edu.cn
为开源项目dolphinscheduler设计两个任务,并且为上述两个任务编写详细的设计文档以及使用文档
场景1: shellA -> subprocessB -> shellC
shellA 查出所有学生信息 (a,b,c) 并将 users 输出给下游任务subprocessB
subprocessB 是一个子工作流, 负责计算所有学生个数 userCount,并将userCount 作为工作流的输出传递给下游
shellC 负责将 userCount 输出到控制台
需要在依赖任务上增加一个字段:是否继承被依赖任务的参数
场景2:
工作流A : taska1 -> taska2, taska2 输出了参数 count=10
工作流B : taskb1 -> dependentb2 -> taskb3
dependentb2 依赖了taska2 就可以将count继承过来作为dependentb2的输出参数,给taskb3使用
本人有vue开发相关经验,曾从事实验室项目(煤矿大数据平台--大屏模块)的开发,熟练运用前端知识,完成项目开发。主要包括,使用flex和grid完成自适应布局;基于websocket的实时数据展示;基于 echarts封装常见组件如柱状图、饼状图、折线图;对接三方监控,了解chrome播放策略等。
丰富的linux使用经验 在中电六所实习的经验,从事软件项目开发 实验室项目开发经验,从事煤炭大数据后端模块的开发 个人博客编写习惯,编写个人博客并在github开源,博客地址
dolphinscheduler项目主要实现为master和worker节点的交互,以下内容分析执行start-process-instance接口的实现细节
首先调用execService的execProcessInstance方法
Map<String, Object> result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode,
scheduleTime, execType, failureStrategy,
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
workerGroup, tenantCode, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun,
testFlag,
complementDependentMode, version, allLevelDependent);
return returnDataList(result);
execProcessInstance方法实现细节
/**
* 主要调用creat command方法将command存储到DB中
*/
int create =
this.createCommand(triggerCode, commandType, processDefinition.getCode(), taskDependType,
failureStrategy,
startNodeList,
cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority,
workerGroup, tenantCode,
environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag,
complementDependentMode, allLevelDependent);
private int createCommand() {
// 忽略大部分代码
} else {
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
// 实际上执行createCommand方法
int count = commandService.createCommand(command);
if (count > 0) {
triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode, command.getId());
}
return count;
}
}
public int createCommand(Command command) {
int result = 0;
if (command == null) {
return result;
}
// add command timezone
Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(command.getProcessDefinitionCode());
if (schedule != null) {
Map<String, String> commandParams =
StringUtils.isNotBlank(command.getCommandParam()) ? JSONUtils.toMap(command.getCommandParam())
: new HashMap<>();
commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId());
command.setCommandParam(JSONUtils.toJsonString(commandParams));
}
command.setId(null);
// 这一步将command存储到DB中
result = commandMapper.insert(command);
return result;
}
主要是MasterSchedulerBootstrap线程线性扫描DB解析command,然后使用分片算法分发给worker
run方法解析
@Override
public void run() {
// 省略大部分代码
// 这一步直接扫描数据库获取command,注意这里同时也会删除数据库习惯记录
List<Command> commands = findCommands();
commands.parallelStream()
.forEach(command -> {
try {
WorkflowExecuteRunnable workflowExecuteRunnable =
workflowExecuteRunnableFactory.createWorkflowExecuteRunnable(command);
ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
if (processInstanceExecCacheManager.contains(processInstance.getId())) {
log.error(
"The workflow instance is already been cached, this case shouldn't be happened");
}
// 这一步是启动一个监听任务,监听该实例执行情况
processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
// 通过事件的方式去传播任务
workflowEventQueue.addEvent(
new WorkflowEvent(WorkflowEventType.START_WORKFLOW, processInstance.getId()));
}
WorkflowEventLooper类用来处理MasterSchedulerBootstrap方法添加的事件
run方法解析
public void run() {
// 省略大部分代码
WorkflowEvent workflowEvent;
while (RUNNING_FLAG.get()) {
try {
// 获取workflowEventQueue队列的事件
workflowEvent = workflowEventQueue.poolEvent();
try (
WorkflowEventHandler workflowEventHandler =
workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());
// 这里通过策略模式处理事件
workflowEventHandler.handleWorkflowEvent(workflowEvent);
}
@Override
public void handleWorkflowEvent(final WorkflowEvent workflowEvent) throws WorkflowEventHandleError {
// 省略大部分代码
// 核心调用call方法去处理任务
CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool)
}
@Override
public WorkflowSubmitStatus call() {
// 省略大部分代码
try {
if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) {
// 新建甘特图
buildFlowDag();
workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;
log.info("workflowStatue changed to :{}", workflowRunnableStatus);
}
if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) {
// 初始化任务队列
initTaskQueue();
workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE;
log.info("workflowStatue changed to :{}", workflowRunnableStatus);
}
if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) {
// 提交任务
submitPostNode(null);
workflowRunnableStatus = WorkflowRunnableStatus.STARTED;
log.info("workflowStatue changed to :{}", workflowRunnableStatus);
}
return WorkflowSubmitStatus.SUCCESS;
}
}
现在为止我们分析到了提交任务阶段,submitPostNode方法提交任务之后实际会执行submitStandByTask方法提交
private void submitPostNode(String parentNodeCode) throws StateEventHandleException {
// 省略大部分代码
submitStandByTask();
updateProcessInstanceState();
}
public void submitStandByTask() throws StateEventHandleException {
// 省略大部分代码
TaskInstance task;
// 循环peek处理任务队列
while ((task = readyToSubmitTaskQueue.peek()) != null) {
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
log.info("The dependResult of task {} is success, so ready to submit to execute", task.getName());
// 实际处理任务
if (!executeTask(task)) {
}
}
private boolean executeTask(TaskInstance taskInstance) {
// 省略大部分代码
// 核心代码,分发任务
taskExecuteRunnable.dispatch();
}
任务分发之后,GlobalTaskDispatchWaitingQueueLooper类会处理分发的任务
@Override
public void run() {
// 省略大部分代码
DefaultTaskExecuteRunnable defaultTaskExecuteRunnable;
while (RUNNING_FLAG.get()) {
try {
// 获取任务
defaultTaskExecuteRunnable = globalTaskDispatchWaitingQueue.takeNeedToDispatchTaskExecuteRunnable();
}
try {
final TaskDispatcher taskDispatcher = taskDispatchFactory
.getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance().getTaskType());
// 分发任务
taskDispatcher.dispatchTask(defaultTaskExecuteRunnable);
}
@Override
public void dispatchTask(TaskExecuteRunnable taskExecuteRunnable) throws TaskDispatchException {
// 省略大部分代码
// 实际分发任务
doDispatch(taskExecuteRunnable);
// 添加分发任务监听事件
addDispatchEvent(taskExecuteRunnable);
}
protected void doDispatch(TaskExecuteRunnable taskExecuteRunnable) throws TaskDispatchException {
// 省略大部分代码
// 核心代码,通过RPC方式分发任务给task处理线程
Message message = masterRpcClient.sendSyncCommand(Host.of(taskExecutionContext.getHost()),
taskDispatchRequest.convert2Command());
}
NettyServerHandler类用来接收master传递的RPC消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 接受消息
processReceived(ctx.channel(), (Message) msg);
}
private void processReceived(final Channel channel, final Message msg) {
// 省略大部分代码
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(messageType);
if (pair != null) {
Runnable r = () -> {
try {
// 通过策略模式分发消息
pair.getLeft().process(channel, msg);
}
WorkerTaskDispatchProcessor类来分发消息
@Counted(value = "ds.task.execution.count", description = "task execute total count")
@Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override
public void process(Channel channel, Message message) {
// 省略大部分代码
WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable
// 推送任务
if (!workerManager.offer(workerTaskExecuteRunnable)) {
log.error("submit task: {} to wait queue error, current queue size: {} is full",
taskExecutionContext.getTaskName(), workerManager.getWaitSubmitQueueSize());
sendDispatchRejectResult(channel, message, taskExecutionContext);
}
}
}
WorkerManagerThread类实际处理消息
@Override
public void run() {
// 省略大部分代码
Thread.currentThread().setName("Worker-Execute-Manager-Thread");
while (!ServerLifeCycleManager.isStopped()) {
try {
if (!ServerLifeCycleManager.isRunning()) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
if (this.getThreadPoolQueueSize() <= workerExecThreads) {
// 获取任务
final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take();
// 提交任务
workerExecService.submit(workerDelayTaskExecuteRunnable);
}
}
}
}
封装的WorkerTaskExecuteRunnable类的run方法
@Override
public void run() {
// 省略大部分代码
// 处理任务
executeTask(taskCallBack);
}
@Override
public void executeTask(TaskCallBack taskCallBack) throws TaskException {
if (task == null) {
throw new IllegalArgumentException("The task plugin instance is not initialized");
}
// 通过策略模式调用任务插件处理任务
task.handle(taskCallBack);
}
根据上述的执行流程分析我们知道任务是以process为调度单元,任务通过任务ID来确定先后顺序
@Override
public int compareTo(Delayed o) {
if (o == null) {
return 1;
}
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
我们可以将任务队列的任务改为submit/future模式,通过添加params参数确定返回值和入参,通过future.get方法来阻塞等待任务将返回值输入到下一个任务中,可以将执行结果缓存到map中,类似于
@Override
public Object executeTask(TaskCallBack taskCallBack) throws TaskException {
if (task == null) {
throw new IllegalArgumentException("The task plugin instance is not initialized");
}
// 通过策略模式调用任务插件处理任务
return task.handle(taskCallBack);
}
WorkerManagerThread类中有一个map缓存结果
ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
@Override
public void run() {
// 省略大部分代码
if (this.getThreadPoolQueueSize() <= workerExecThreads) {
// 获取任务
final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take();
// 判断是否需要入参
if (需要入参) {
改造workerDelayTaskExecuteRunnable
}
// 提交任务
future f = workerExecService.submit(workerDelayTaskExecuteRunnable);
f.put(ID, f.get())
}
}
}
}
有丰富的后端开发经验,可以保证较快的进度, 身边的同学也同时参加了开源之夏,可以相互帮助进步 也是第一次参与开源项目,有兴趣长期投入事件参与开源事业,为他添砖加瓦