|
@@ -0,0 +1,429 @@
|
|
|
+> [TOC]
|
|
|
+
|
|
|
+# 1、项目背景
|
|
|
+
|
|
|
+项目名称:增加跨工作流的参数传递功能
|
|
|
+
|
|
|
+项目主导师: 鲍亮
|
|
|
+
|
|
|
+申请人:孙浩博
|
|
|
+
|
|
|
+申请日期:2023/6/4
|
|
|
+
|
|
|
+邮箱:sunhaobo@stu.xidian.edu.cn
|
|
|
+
|
|
|
+## 1.1、项目基本背景
|
|
|
+
|
|
|
+为开源项目dolphinscheduler设计两个任务,并且为上述两个任务编写详细的设计文档以及使用文档
|
|
|
+
|
|
|
+1. 工作流可以选择自己的输出参数,作为工作流的输出参数,输出给下游任务使用。
|
|
|
+
|
|
|
+场景1: shellA -> subprocessB -> shellC
|
|
|
+
|
|
|
+shellA 查出所有学生信息 (a,b,c) 并将 users 输出给下游任务subprocessB
|
|
|
+
|
|
|
+subprocessB 是一个子工作流, 负责计算所有学生个数 userCount,并将userCount 作为工作流的输出传递给下游
|
|
|
+
|
|
|
+shellC 负责将 userCount 输出到控制台
|
|
|
+
|
|
|
+2. 依赖任务也可以选择将被依赖任务的输出参数继承过来使用。增加依赖参数继承功能
|
|
|
+
|
|
|
+需要在依赖任务上增加一个字段:是否继承被依赖任务的参数
|
|
|
+
|
|
|
+场景2:
|
|
|
+
|
|
|
+工作流A : taska1 -> taska2, taska2 输出了参数 count=10
|
|
|
+
|
|
|
+工作流B : taskb1 -> dependentb2 -> taskb3
|
|
|
+
|
|
|
+dependentb2 依赖了taska2 就可以将count继承过来作为dependentb2的输出参数,给taskb3使用
|
|
|
+
|
|
|
+
|
|
|
+## 1.2、开源项目仓库
|
|
|
+
|
|
|
+- https://github.com/apache/dolphinscheduler
|
|
|
+
|
|
|
+# 2、方法可行性
|
|
|
+
|
|
|
+## 2.1、vue相关
|
|
|
+
|
|
|
+本人有vue开发相关经验,曾从事实验室项目(煤矿大数据平台--大屏模块)的开发,熟练运用前端知识,完成项目开发。主要包括,使用flex和grid完成自适应布局;基于websocket的实时数据展示;基于 echarts封装常见组件如柱状图、饼状图、折线图;对接三方监控,了解chrome播放策略等。
|
|
|
+
|
|
|
+## 2.2、java开发经验
|
|
|
+
|
|
|
+丰富的linux使用经验
|
|
|
+在中电六所实习的经验,从事软件项目开发
|
|
|
+实验室项目开发经验,从事煤炭大数据后端模块的开发
|
|
|
+个人博客编写习惯,编写个人博客并在github开源,[博客地址](https://www.seamew.top)
|
|
|
+
|
|
|
+# 3、项目实现细节--(主要分析start-process-instance接口)
|
|
|
+
|
|
|
+dolphinscheduler项目主要实现为master和worker节点的交互,以下内容分析执行start-process-instance接口的实现细节
|
|
|
+
|
|
|
+![image-20230604104941343](assets/image-20230604104941343.png)
|
|
|
+
|
|
|
+## 3.1、master执行流程分析
|
|
|
+
|
|
|
+首先调用execService的execProcessInstance方法
|
|
|
+
|
|
|
+```java
|
|
|
+ Map<String, Object> result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode,
|
|
|
+ scheduleTime, execType, failureStrategy,
|
|
|
+ startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
|
|
|
+ workerGroup, tenantCode, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun,
|
|
|
+ testFlag,
|
|
|
+ complementDependentMode, version, allLevelDependent);
|
|
|
+ return returnDataList(result);
|
|
|
+```
|
|
|
+
|
|
|
+execProcessInstance方法实现细节
|
|
|
+
|
|
|
+```java
|
|
|
+ /**
|
|
|
+ * 主要调用creat command方法将command存储到DB中
|
|
|
+ */
|
|
|
+ int create =
|
|
|
+ this.createCommand(triggerCode, commandType, processDefinition.getCode(), taskDependType,
|
|
|
+ failureStrategy,
|
|
|
+ startNodeList,
|
|
|
+ cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority,
|
|
|
+ workerGroup, tenantCode,
|
|
|
+ environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag,
|
|
|
+ complementDependentMode, allLevelDependent);
|
|
|
+
|
|
|
+
|
|
|
+ private int createCommand() {
|
|
|
+ // 忽略大部分代码
|
|
|
+ } else {
|
|
|
+ command.setCommandParam(JSONUtils.toJsonString(cmdParam));
|
|
|
+ // 实际上执行createCommand方法
|
|
|
+ int count = commandService.createCommand(command);
|
|
|
+ if (count > 0) {
|
|
|
+ triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode, command.getId());
|
|
|
+ }
|
|
|
+ return count;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public int createCommand(Command command) {
|
|
|
+ int result = 0;
|
|
|
+ if (command == null) {
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ // add command timezone
|
|
|
+ Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(command.getProcessDefinitionCode());
|
|
|
+ if (schedule != null) {
|
|
|
+ Map<String, String> commandParams =
|
|
|
+ StringUtils.isNotBlank(command.getCommandParam()) ? JSONUtils.toMap(command.getCommandParam())
|
|
|
+ : new HashMap<>();
|
|
|
+ commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId());
|
|
|
+ command.setCommandParam(JSONUtils.toJsonString(commandParams));
|
|
|
+ }
|
|
|
+ command.setId(null);
|
|
|
+ // 这一步将command存储到DB中
|
|
|
+ result = commandMapper.insert(command);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+```
|
|
|
+
|
|
|
+## 3.2、master分配任务细节
|
|
|
+
|
|
|
+主要是MasterSchedulerBootstrap线程线性扫描DB解析command,然后使用分片算法分发给worker
|
|
|
+
|
|
|
+run方法解析
|
|
|
+
|
|
|
+```java
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ // 省略大部分代码
|
|
|
+ // 这一步直接扫描数据库获取command,注意这里同时也会删除数据库习惯记录
|
|
|
+ List<Command> commands = findCommands();
|
|
|
+ commands.parallelStream()
|
|
|
+ .forEach(command -> {
|
|
|
+ try {
|
|
|
+ WorkflowExecuteRunnable workflowExecuteRunnable =
|
|
|
+ workflowExecuteRunnableFactory.createWorkflowExecuteRunnable(command);
|
|
|
+ ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
|
|
|
+ if (processInstanceExecCacheManager.contains(processInstance.getId())) {
|
|
|
+ log.error(
|
|
|
+ "The workflow instance is already been cached, this case shouldn't be happened");
|
|
|
+ }
|
|
|
+ // 这一步是启动一个监听任务,监听该实例执行情况
|
|
|
+ processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
|
|
|
+ // 通过事件的方式去传播任务
|
|
|
+ workflowEventQueue.addEvent(
|
|
|
+ new WorkflowEvent(WorkflowEventType.START_WORKFLOW, processInstance.getId()));
|
|
|
+ }
|
|
|
+```
|
|
|
+
|
|
|
+WorkflowEventLooper类用来处理MasterSchedulerBootstrap方法添加的事件
|
|
|
+
|
|
|
+run方法解析
|
|
|
+
|
|
|
+```java
|
|
|
+ public void run() {
|
|
|
+ // 省略大部分代码
|
|
|
+ WorkflowEvent workflowEvent;
|
|
|
+ while (RUNNING_FLAG.get()) {
|
|
|
+ try {
|
|
|
+ // 获取workflowEventQueue队列的事件
|
|
|
+ workflowEvent = workflowEventQueue.poolEvent();
|
|
|
+ try (
|
|
|
+ WorkflowEventHandler workflowEventHandler =
|
|
|
+ workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());
|
|
|
+ // 这里通过策略模式处理事件
|
|
|
+ workflowEventHandler.handleWorkflowEvent(workflowEvent);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void handleWorkflowEvent(final WorkflowEvent workflowEvent) throws WorkflowEventHandleError {
|
|
|
+ // 省略大部分代码
|
|
|
+ // 核心调用call方法去处理任务
|
|
|
+ CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool)
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public WorkflowSubmitStatus call() {
|
|
|
+ // 省略大部分代码
|
|
|
+ try {
|
|
|
+ if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) {
|
|
|
+ // 新建甘特图
|
|
|
+ buildFlowDag();
|
|
|
+ workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;
|
|
|
+ log.info("workflowStatue changed to :{}", workflowRunnableStatus);
|
|
|
+ }
|
|
|
+ if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) {
|
|
|
+ // 初始化任务队列
|
|
|
+ initTaskQueue();
|
|
|
+ workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE;
|
|
|
+ log.info("workflowStatue changed to :{}", workflowRunnableStatus);
|
|
|
+ }
|
|
|
+ if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) {
|
|
|
+ // 提交任务
|
|
|
+ submitPostNode(null);
|
|
|
+ workflowRunnableStatus = WorkflowRunnableStatus.STARTED;
|
|
|
+ log.info("workflowStatue changed to :{}", workflowRunnableStatus);
|
|
|
+ }
|
|
|
+ return WorkflowSubmitStatus.SUCCESS;
|
|
|
+ }
|
|
|
+ }
|
|
|
+```
|
|
|
+
|
|
|
+现在为止我们分析到了提交任务阶段,submitPostNode方法提交任务之后实际会执行submitStandByTask方法提交
|
|
|
+
|
|
|
+```java
|
|
|
+ private void submitPostNode(String parentNodeCode) throws StateEventHandleException {
|
|
|
+ // 省略大部分代码
|
|
|
+ submitStandByTask();
|
|
|
+ updateProcessInstanceState();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void submitStandByTask() throws StateEventHandleException {
|
|
|
+ // 省略大部分代码
|
|
|
+ TaskInstance task;
|
|
|
+ // 循环peek处理任务队列
|
|
|
+ while ((task = readyToSubmitTaskQueue.peek()) != null) {
|
|
|
+ DependResult dependResult = getDependResultForTask(task);
|
|
|
+ if (DependResult.SUCCESS == dependResult) {
|
|
|
+ log.info("The dependResult of task {} is success, so ready to submit to execute", task.getName());
|
|
|
+ // 实际处理任务
|
|
|
+ if (!executeTask(task)) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ private boolean executeTask(TaskInstance taskInstance) {
|
|
|
+ // 省略大部分代码
|
|
|
+ // 核心代码,分发任务
|
|
|
+ taskExecuteRunnable.dispatch();
|
|
|
+ }
|
|
|
+```
|
|
|
+
|
|
|
+任务分发之后,GlobalTaskDispatchWaitingQueueLooper类会处理分发的任务
|
|
|
+
|
|
|
+```java
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ // 省略大部分代码
|
|
|
+ DefaultTaskExecuteRunnable defaultTaskExecuteRunnable;
|
|
|
+ while (RUNNING_FLAG.get()) {
|
|
|
+ try {
|
|
|
+ // 获取任务
|
|
|
+ defaultTaskExecuteRunnable = globalTaskDispatchWaitingQueue.takeNeedToDispatchTaskExecuteRunnable();
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ final TaskDispatcher taskDispatcher = taskDispatchFactory
|
|
|
+ .getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance().getTaskType());
|
|
|
+ // 分发任务
|
|
|
+ taskDispatcher.dispatchTask(defaultTaskExecuteRunnable);
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public void dispatchTask(TaskExecuteRunnable taskExecuteRunnable) throws TaskDispatchException {
|
|
|
+ // 省略大部分代码
|
|
|
+ // 实际分发任务
|
|
|
+ doDispatch(taskExecuteRunnable);
|
|
|
+ // 添加分发任务监听事件
|
|
|
+ addDispatchEvent(taskExecuteRunnable);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void doDispatch(TaskExecuteRunnable taskExecuteRunnable) throws TaskDispatchException {
|
|
|
+ // 省略大部分代码
|
|
|
+ // 核心代码,通过RPC方式分发任务给task处理线程
|
|
|
+ Message message = masterRpcClient.sendSyncCommand(Host.of(taskExecutionContext.getHost()),
|
|
|
+ taskDispatchRequest.convert2Command());
|
|
|
+ }
|
|
|
+```
|
|
|
+
|
|
|
+## 3.3、worker线程处理task
|
|
|
+
|
|
|
+NettyServerHandler类用来接收master传递的RPC消息
|
|
|
+
|
|
|
+```java
|
|
|
+ @Override
|
|
|
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
|
|
+ // 接受消息
|
|
|
+ processReceived(ctx.channel(), (Message) msg);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void processReceived(final Channel channel, final Message msg) {
|
|
|
+ // 省略大部分代码
|
|
|
+ final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(messageType);
|
|
|
+ if (pair != null) {
|
|
|
+ Runnable r = () -> {
|
|
|
+ try {
|
|
|
+ // 通过策略模式分发消息
|
|
|
+ pair.getLeft().process(channel, msg);
|
|
|
+ }
|
|
|
+```
|
|
|
+
|
|
|
+WorkerTaskDispatchProcessor类来分发消息
|
|
|
+
|
|
|
+```java
|
|
|
+ @Counted(value = "ds.task.execution.count", description = "task execute total count")
|
|
|
+ @Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
|
|
|
+ @Override
|
|
|
+ public void process(Channel channel, Message message) {
|
|
|
+ // 省略大部分代码
|
|
|
+ WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable
|
|
|
+ // 推送任务
|
|
|
+ if (!workerManager.offer(workerTaskExecuteRunnable)) {
|
|
|
+ log.error("submit task: {} to wait queue error, current queue size: {} is full",
|
|
|
+ taskExecutionContext.getTaskName(), workerManager.getWaitSubmitQueueSize());
|
|
|
+ sendDispatchRejectResult(channel, message, taskExecutionContext);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+```
|
|
|
+
|
|
|
+WorkerManagerThread类实际处理消息
|
|
|
+
|
|
|
+```java
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ // 省略大部分代码
|
|
|
+ Thread.currentThread().setName("Worker-Execute-Manager-Thread");
|
|
|
+ while (!ServerLifeCycleManager.isStopped()) {
|
|
|
+ try {
|
|
|
+ if (!ServerLifeCycleManager.isRunning()) {
|
|
|
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
|
|
|
+ }
|
|
|
+ if (this.getThreadPoolQueueSize() <= workerExecThreads) {
|
|
|
+ // 获取任务
|
|
|
+ final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take();
|
|
|
+ // 提交任务
|
|
|
+ workerExecService.submit(workerDelayTaskExecuteRunnable);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+```
|
|
|
+
|
|
|
+封装的WorkerTaskExecuteRunnable类的run方法
|
|
|
+
|
|
|
+```java
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ // 省略大部分代码
|
|
|
+ // 处理任务
|
|
|
+ executeTask(taskCallBack);
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public void executeTask(TaskCallBack taskCallBack) throws TaskException {
|
|
|
+ if (task == null) {
|
|
|
+ throw new IllegalArgumentException("The task plugin instance is not initialized");
|
|
|
+ }
|
|
|
+ // 通过策略模式调用任务插件处理任务
|
|
|
+ task.handle(taskCallBack);
|
|
|
+ }
|
|
|
+```
|
|
|
+
|
|
|
+## 3.4、处理任务1
|
|
|
+
|
|
|
+根据上述的执行流程分析我们知道任务是以process为调度单元,任务通过任务ID来确定先后顺序
|
|
|
+
|
|
|
+![image-20230604112340116](assets/image-20230604112340116.png)
|
|
|
+
|
|
|
+```java
|
|
|
+ @Override
|
|
|
+ public int compareTo(Delayed o) {
|
|
|
+ if (o == null) {
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+ return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
|
|
|
+ }
|
|
|
+```
|
|
|
+
|
|
|
+我们可以将任务队列的任务改为submit/future模式,通过添加params参数确定返回值和入参,通过future.get方法来阻塞等待任务将返回值输入到下一个任务中,可以将执行结果缓存到map中,类似于
|
|
|
+
|
|
|
+```java
|
|
|
+ @Override
|
|
|
+ public Object executeTask(TaskCallBack taskCallBack) throws TaskException {
|
|
|
+ if (task == null) {
|
|
|
+ throw new IllegalArgumentException("The task plugin instance is not initialized");
|
|
|
+ }
|
|
|
+ // 通过策略模式调用任务插件处理任务
|
|
|
+ return task.handle(taskCallBack);
|
|
|
+ }
|
|
|
+
|
|
|
+```
|
|
|
+
|
|
|
+WorkerManagerThread类中有一个map缓存结果
|
|
|
+
|
|
|
+```java
|
|
|
+ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ // 省略大部分代码
|
|
|
+ if (this.getThreadPoolQueueSize() <= workerExecThreads) {
|
|
|
+ // 获取任务
|
|
|
+ final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take();
|
|
|
+ // 判断是否需要入参
|
|
|
+ if (需要入参) {
|
|
|
+ 改造workerDelayTaskExecuteRunnable
|
|
|
+ }
|
|
|
+ // 提交任务
|
|
|
+ future f = workerExecService.submit(workerDelayTaskExecuteRunnable);
|
|
|
+ f.put(ID, f.get())
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+```
|
|
|
+
|
|
|
+# 4、规划
|
|
|
+
|
|
|
+> 有丰富的后端开发经验,可以保证较快的进度,
|
|
|
+> 身边的同学也同时参加了开源之夏,可以相互帮助进步
|
|
|
+> 也是第一次参与开源项目,有兴趣长期投入事件参与开源事业,为他添砖加瓦
|
|
|
+
|
|
|
+## 4.1、项目研发第一阶段(6月-7月):
|
|
|
+
|
|
|
+* 6月份大致完成任务1
|
|
|
+* 7月份完成任务2
|
|
|
+
|
|
|
+## 4.2、项目研发第二阶段(7月-8月):
|
|
|
+
|
|
|
+* 完善任务1,2
|
|
|
+* 编写前端UI
|
|
|
+* 编写文档
|
|
|
+* 总结
|
|
|
+
|