> [TOC] # 1、项目背景 项目名称:增加跨工作流的参数传递功能 项目主导师: 鲍亮 申请人:孙浩博 申请日期:2023/6/4 邮箱:sunhaobo@stu.xidian.edu.cn ## 1.1、项目基本背景 为开源项目dolphinscheduler设计两个任务,并且为上述两个任务编写详细的设计文档以及使用文档 1. 工作流可以选择自己的输出参数,作为工作流的输出参数,输出给下游任务使用。 场景1: shellA -> subprocessB -> shellC shellA 查出所有学生信息 (a,b,c) 并将 users 输出给下游任务subprocessB subprocessB 是一个子工作流, 负责计算所有学生个数 userCount,并将userCount 作为工作流的输出传递给下游 shellC 负责将 userCount 输出到控制台 2. 依赖任务也可以选择将被依赖任务的输出参数继承过来使用。增加依赖参数继承功能 需要在依赖任务上增加一个字段:是否继承被依赖任务的参数 场景2: 工作流A : taska1 -> taska2, taska2 输出了参数 count=10 工作流B : taskb1 -> dependentb2 -> taskb3 dependentb2 依赖了taska2 就可以将count继承过来作为dependentb2的输出参数,给taskb3使用 ## 1.2、开源项目仓库 - https://github.com/apache/dolphinscheduler # 2、方法可行性 ## 2.1、vue相关 本人有vue开发相关经验,曾从事实验室项目(煤矿大数据平台--大屏模块)的开发,熟练运用前端知识,完成项目开发。主要包括,使用flex和grid完成自适应布局;基于websocket的实时数据展示;基于 echarts封装常见组件如柱状图、饼状图、折线图;对接三方监控,了解chrome播放策略等。 ## 2.2、java开发经验 丰富的linux使用经验 在中电六所实习的经验,从事软件项目开发 实验室项目开发经验,从事煤炭大数据后端模块的开发 个人博客编写习惯,编写个人博客并在github开源,[博客地址](https://www.seamew.top) # 3、项目实现细节--(主要分析start-process-instance接口) dolphinscheduler项目主要实现为master和worker节点的交互,以下内容分析执行start-process-instance接口的实现细节 ![image-20230604104941343](assets/image-20230604104941343.png) ## 3.1、master执行流程分析 首先调用execService的execProcessInstance方法 ```java Map result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, execType, failureStrategy, startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, tenantCode, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, testFlag, complementDependentMode, version, allLevelDependent); return returnDataList(result); ``` execProcessInstance方法实现细节 ```java /** * 主要调用creat command方法将command存储到DB中 */ int create = this.createCommand(triggerCode, commandType, processDefinition.getCode(), taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority, workerGroup, tenantCode, environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag, complementDependentMode, allLevelDependent); private int createCommand() { // 忽略大部分代码 } else { command.setCommandParam(JSONUtils.toJsonString(cmdParam)); // 实际上执行createCommand方法 int count = commandService.createCommand(command); if (count > 0) { triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode, command.getId()); } return count; } } public int createCommand(Command command) { int result = 0; if (command == null) { return result; } // add command timezone Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(command.getProcessDefinitionCode()); if (schedule != null) { Map commandParams = StringUtils.isNotBlank(command.getCommandParam()) ? JSONUtils.toMap(command.getCommandParam()) : new HashMap<>(); commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId()); command.setCommandParam(JSONUtils.toJsonString(commandParams)); } command.setId(null); // 这一步将command存储到DB中 result = commandMapper.insert(command); return result; } ``` ## 3.2、master分配任务细节 主要是MasterSchedulerBootstrap线程线性扫描DB解析command,然后使用分片算法分发给worker run方法解析 ```java @Override public void run() { // 省略大部分代码 // 这一步直接扫描数据库获取command,注意这里同时也会删除数据库习惯记录 List commands = findCommands(); commands.parallelStream() .forEach(command -> { try { WorkflowExecuteRunnable workflowExecuteRunnable = workflowExecuteRunnableFactory.createWorkflowExecuteRunnable(command); ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance(); if (processInstanceExecCacheManager.contains(processInstance.getId())) { log.error( "The workflow instance is already been cached, this case shouldn't be happened"); } // 这一步是启动一个监听任务,监听该实例执行情况 processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable); // 通过事件的方式去传播任务 workflowEventQueue.addEvent( new WorkflowEvent(WorkflowEventType.START_WORKFLOW, processInstance.getId())); } ``` WorkflowEventLooper类用来处理MasterSchedulerBootstrap方法添加的事件 run方法解析 ```java public void run() { // 省略大部分代码 WorkflowEvent workflowEvent; while (RUNNING_FLAG.get()) { try { // 获取workflowEventQueue队列的事件 workflowEvent = workflowEventQueue.poolEvent(); try ( WorkflowEventHandler workflowEventHandler = workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType()); // 这里通过策略模式处理事件 workflowEventHandler.handleWorkflowEvent(workflowEvent); } @Override public void handleWorkflowEvent(final WorkflowEvent workflowEvent) throws WorkflowEventHandleError { // 省略大部分代码 // 核心调用call方法去处理任务 CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool) } @Override public WorkflowSubmitStatus call() { // 省略大部分代码 try { if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) { // 新建甘特图 buildFlowDag(); workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG; log.info("workflowStatue changed to :{}", workflowRunnableStatus); } if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) { // 初始化任务队列 initTaskQueue(); workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE; log.info("workflowStatue changed to :{}", workflowRunnableStatus); } if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) { // 提交任务 submitPostNode(null); workflowRunnableStatus = WorkflowRunnableStatus.STARTED; log.info("workflowStatue changed to :{}", workflowRunnableStatus); } return WorkflowSubmitStatus.SUCCESS; } } ``` 现在为止我们分析到了提交任务阶段,submitPostNode方法提交任务之后实际会执行submitStandByTask方法提交 ```java private void submitPostNode(String parentNodeCode) throws StateEventHandleException { // 省略大部分代码 submitStandByTask(); updateProcessInstanceState(); } public void submitStandByTask() throws StateEventHandleException { // 省略大部分代码 TaskInstance task; // 循环peek处理任务队列 while ((task = readyToSubmitTaskQueue.peek()) != null) { DependResult dependResult = getDependResultForTask(task); if (DependResult.SUCCESS == dependResult) { log.info("The dependResult of task {} is success, so ready to submit to execute", task.getName()); // 实际处理任务 if (!executeTask(task)) { } } private boolean executeTask(TaskInstance taskInstance) { // 省略大部分代码 // 核心代码,分发任务 taskExecuteRunnable.dispatch(); } ``` 任务分发之后,GlobalTaskDispatchWaitingQueueLooper类会处理分发的任务 ```java @Override public void run() { // 省略大部分代码 DefaultTaskExecuteRunnable defaultTaskExecuteRunnable; while (RUNNING_FLAG.get()) { try { // 获取任务 defaultTaskExecuteRunnable = globalTaskDispatchWaitingQueue.takeNeedToDispatchTaskExecuteRunnable(); } try { final TaskDispatcher taskDispatcher = taskDispatchFactory .getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance().getTaskType()); // 分发任务 taskDispatcher.dispatchTask(defaultTaskExecuteRunnable); } @Override public void dispatchTask(TaskExecuteRunnable taskExecuteRunnable) throws TaskDispatchException { // 省略大部分代码 // 实际分发任务 doDispatch(taskExecuteRunnable); // 添加分发任务监听事件 addDispatchEvent(taskExecuteRunnable); } protected void doDispatch(TaskExecuteRunnable taskExecuteRunnable) throws TaskDispatchException { // 省略大部分代码 // 核心代码,通过RPC方式分发任务给task处理线程 Message message = masterRpcClient.sendSyncCommand(Host.of(taskExecutionContext.getHost()), taskDispatchRequest.convert2Command()); } ``` ## 3.3、worker线程处理task NettyServerHandler类用来接收master传递的RPC消息 ```java @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // 接受消息 processReceived(ctx.channel(), (Message) msg); } private void processReceived(final Channel channel, final Message msg) { // 省略大部分代码 final Pair pair = processors.get(messageType); if (pair != null) { Runnable r = () -> { try { // 通过策略模式分发消息 pair.getLeft().process(channel, msg); } ``` WorkerTaskDispatchProcessor类来分发消息 ```java @Counted(value = "ds.task.execution.count", description = "task execute total count") @Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true) @Override public void process(Channel channel, Message message) { // 省略大部分代码 WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable // 推送任务 if (!workerManager.offer(workerTaskExecuteRunnable)) { log.error("submit task: {} to wait queue error, current queue size: {} is full", taskExecutionContext.getTaskName(), workerManager.getWaitSubmitQueueSize()); sendDispatchRejectResult(channel, message, taskExecutionContext); } } } ``` WorkerManagerThread类实际处理消息 ```java @Override public void run() { // 省略大部分代码 Thread.currentThread().setName("Worker-Execute-Manager-Thread"); while (!ServerLifeCycleManager.isStopped()) { try { if (!ServerLifeCycleManager.isRunning()) { Thread.sleep(Constants.SLEEP_TIME_MILLIS); } if (this.getThreadPoolQueueSize() <= workerExecThreads) { // 获取任务 final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take(); // 提交任务 workerExecService.submit(workerDelayTaskExecuteRunnable); } } } } ``` 封装的WorkerTaskExecuteRunnable类的run方法 ```java @Override public void run() { // 省略大部分代码 // 处理任务 executeTask(taskCallBack); } @Override public void executeTask(TaskCallBack taskCallBack) throws TaskException { if (task == null) { throw new IllegalArgumentException("The task plugin instance is not initialized"); } // 通过策略模式调用任务插件处理任务 task.handle(taskCallBack); } ``` ## 3.4、处理任务1 根据上述的执行流程分析我们知道任务是以process为调度单元,任务通过任务ID来确定先后顺序 ![image-20230604112340116](assets/image-20230604112340116.png) ```java @Override public int compareTo(Delayed o) { if (o == null) { return 1; } return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); } ``` 我们可以将任务队列的任务改为submit/future模式,通过添加params参数确定返回值和入参,通过future.get方法来阻塞等待任务将返回值输入到下一个任务中,可以将执行结果缓存到map中,类似于 ```java @Override public Object executeTask(TaskCallBack taskCallBack) throws TaskException { if (task == null) { throw new IllegalArgumentException("The task plugin instance is not initialized"); } // 通过策略模式调用任务插件处理任务 return task.handle(taskCallBack); } ``` WorkerManagerThread类中有一个map缓存结果 ```java ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(); @Override public void run() { // 省略大部分代码 if (this.getThreadPoolQueueSize() <= workerExecThreads) { // 获取任务 final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take(); // 判断是否需要入参 if (需要入参) { 改造workerDelayTaskExecuteRunnable } // 提交任务 future f = workerExecService.submit(workerDelayTaskExecuteRunnable); f.put(ID, f.get()) } } } } ``` # 4、规划 > 有丰富的后端开发经验,可以保证较快的进度, > 身边的同学也同时参加了开源之夏,可以相互帮助进步 > 也是第一次参与开源项目,有兴趣长期投入事件参与开源事业,为他添砖加瓦 ## 4.1、项目研发第一阶段(6月-7月): * 6月份大致完成任务1 * 7月份完成任务2 ## 4.2、项目研发第二阶段(7月-8月): * 完善任务1,2 * 编写前端UI * 编写文档 * 总结