申请书.md 16 KB

[TOC]

1、项目背景

项目名称:增加跨工作流的参数传递功能

项目主导师: 鲍亮

申请人:孙浩博

申请日期:2023/6/4

邮箱:sunhaobo@stu.xidian.edu.cn

1.1、项目基本背景

为开源项目dolphinscheduler设计两个任务,并且为上述两个任务编写详细的设计文档以及使用文档

  1. 工作流可以选择自己的输出参数,作为工作流的输出参数,输出给下游任务使用。

场景1: shellA -> subprocessB -> shellC

shellA 查出所有学生信息 (a,b,c) 并将 users 输出给下游任务subprocessB

subprocessB 是一个子工作流, 负责计算所有学生个数 userCount,并将userCount 作为工作流的输出传递给下游

shellC 负责将 userCount 输出到控制台

  1. 依赖任务也可以选择将被依赖任务的输出参数继承过来使用。增加依赖参数继承功能

需要在依赖任务上增加一个字段:是否继承被依赖任务的参数

场景2:

工作流A : taska1 -> taska2, taska2 输出了参数 count=10

工作流B : taskb1 -> dependentb2 -> taskb3

dependentb2 依赖了taska2 就可以将count继承过来作为dependentb2的输出参数,给taskb3使用

1.2、开源项目仓库

2、方法可行性

2.1、vue相关

本人有vue开发相关经验,曾从事实验室项目(煤矿大数据平台--大屏模块)的开发,熟练运用前端知识,完成项目开发。主要包括,使用flex和grid完成自适应布局;基于websocket的实时数据展示;基于 echarts封装常见组件如柱状图、饼状图、折线图;对接三方监控,了解chrome播放策略等。

2.2、java开发经验

丰富的linux使用经验 在中电六所实习的经验,从事软件项目开发 实验室项目开发经验,从事煤炭大数据后端模块的开发 个人博客编写习惯,编写个人博客并在github开源,博客地址

3、项目实现细节--(主要分析start-process-instance接口)

dolphinscheduler项目主要实现为master和worker节点的交互,以下内容分析执行start-process-instance接口的实现细节

image-20230604104941343

3.1、master执行流程分析

首先调用execService的execProcessInstance方法

        Map<String, Object> result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode,
                scheduleTime, execType, failureStrategy,
                startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
                workerGroup, tenantCode, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun,
                testFlag,
                complementDependentMode, version, allLevelDependent);
        return returnDataList(result);

execProcessInstance方法实现细节

        /**
         * 主要调用creat command方法将command存储到DB中
         */
        int create =
                this.createCommand(triggerCode, commandType, processDefinition.getCode(), taskDependType,
                        failureStrategy,
                        startNodeList,
                        cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority,
                        workerGroup, tenantCode,
                        environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag,
                        complementDependentMode, allLevelDependent);


    private int createCommand() {
        // 忽略大部分代码
        } else {
            command.setCommandParam(JSONUtils.toJsonString(cmdParam));
        	// 实际上执行createCommand方法
            int count = commandService.createCommand(command);
            if (count > 0) {
                triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode, command.getId());
            }
            return count;
        }
    }

    public int createCommand(Command command) {
        int result = 0;
        if (command == null) {
            return result;
        }
        // add command timezone
        Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(command.getProcessDefinitionCode());
        if (schedule != null) {
            Map<String, String> commandParams =
                    StringUtils.isNotBlank(command.getCommandParam()) ? JSONUtils.toMap(command.getCommandParam())
                            : new HashMap<>();
            commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId());
            command.setCommandParam(JSONUtils.toJsonString(commandParams));
        }
        command.setId(null);
        // 这一步将command存储到DB中
        result = commandMapper.insert(command);
        return result;
    }

3.2、master分配任务细节

主要是MasterSchedulerBootstrap线程线性扫描DB解析command,然后使用分片算法分发给worker

run方法解析

    @Override
    public void run() {
        // 省略大部分代码
        // 这一步直接扫描数据库获取command,注意这里同时也会删除数据库习惯记录
        List<Command> commands = findCommands();
        commands.parallelStream()
            .forEach(command -> {
                try {
                    WorkflowExecuteRunnable workflowExecuteRunnable =
                        workflowExecuteRunnableFactory.createWorkflowExecuteRunnable(command);
                    ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
                    if (processInstanceExecCacheManager.contains(processInstance.getId())) {
                        log.error(
                            "The workflow instance is already been cached, this case shouldn't be happened");
                    }
                    // 这一步是启动一个监听任务,监听该实例执行情况
                    processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
                    // 通过事件的方式去传播任务
                    workflowEventQueue.addEvent(
                        new WorkflowEvent(WorkflowEventType.START_WORKFLOW, processInstance.getId()));
    }

WorkflowEventLooper类用来处理MasterSchedulerBootstrap方法添加的事件

run方法解析

    public void run() {
        // 省略大部分代码
        WorkflowEvent workflowEvent;
        while (RUNNING_FLAG.get()) {
            try {
                // 获取workflowEventQueue队列的事件
                workflowEvent = workflowEventQueue.poolEvent();
            try (
                WorkflowEventHandler workflowEventHandler =
                        workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());
                // 这里通过策略模式处理事件
                workflowEventHandler.handleWorkflowEvent(workflowEvent);
    }
                
    @Override
    public void handleWorkflowEvent(final WorkflowEvent workflowEvent) throws WorkflowEventHandleError {
	    // 省略大部分代码
        // 核心调用call方法去处理任务
        CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool)
    }
                
    @Override
    public WorkflowSubmitStatus call() {
        // 省略大部分代码
        try {
            if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) {
                // 新建甘特图
                buildFlowDag();
                workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;
                log.info("workflowStatue changed to :{}", workflowRunnableStatus);
            }
            if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) {
                // 初始化任务队列
                initTaskQueue();
                workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE;
                log.info("workflowStatue changed to :{}", workflowRunnableStatus);
            }
            if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) {
                // 提交任务
                submitPostNode(null);
                workflowRunnableStatus = WorkflowRunnableStatus.STARTED;
                log.info("workflowStatue changed to :{}", workflowRunnableStatus);
            }
            return WorkflowSubmitStatus.SUCCESS;
        }
    }

现在为止我们分析到了提交任务阶段,submitPostNode方法提交任务之后实际会执行submitStandByTask方法提交

    private void submitPostNode(String parentNodeCode) throws StateEventHandleException {
        // 省略大部分代码
        submitStandByTask();
        updateProcessInstanceState();
    }

    public void submitStandByTask() throws StateEventHandleException {
        // 省略大部分代码
        TaskInstance task;
        // 循环peek处理任务队列
        while ((task = readyToSubmitTaskQueue.peek()) != null) {
            DependResult dependResult = getDependResultForTask(task);
            if (DependResult.SUCCESS == dependResult) {
                log.info("The dependResult of task {} is success, so ready to submit to execute", task.getName());
                // 实际处理任务
                if (!executeTask(task)) {
        }
    }
    private boolean executeTask(TaskInstance taskInstance) {
        // 省略大部分代码
        // 核心代码,分发任务
        taskExecuteRunnable.dispatch();
    }

任务分发之后,GlobalTaskDispatchWaitingQueueLooper类会处理分发的任务

   @Override
    public void run() {
        // 省略大部分代码
        DefaultTaskExecuteRunnable defaultTaskExecuteRunnable;
        while (RUNNING_FLAG.get()) {
            try {
                // 获取任务
                defaultTaskExecuteRunnable = globalTaskDispatchWaitingQueue.takeNeedToDispatchTaskExecuteRunnable();
            }
            try {
                final TaskDispatcher taskDispatcher = taskDispatchFactory
                        .getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance().getTaskType());
                // 分发任务
                taskDispatcher.dispatchTask(defaultTaskExecuteRunnable);
    }
                @Override
    public void dispatchTask(TaskExecuteRunnable taskExecuteRunnable) throws TaskDispatchException {
        // 省略大部分代码
        // 实际分发任务
        doDispatch(taskExecuteRunnable);
        // 添加分发任务监听事件
        addDispatchEvent(taskExecuteRunnable);
    }
            
    protected void doDispatch(TaskExecuteRunnable taskExecuteRunnable) throws TaskDispatchException {
        // 省略大部分代码
        // 核心代码,通过RPC方式分发任务给task处理线程
        Message message = masterRpcClient.sendSyncCommand(Host.of(taskExecutionContext.getHost()),
                                                          taskDispatchRequest.convert2Command());
    }

3.3、worker线程处理task

NettyServerHandler类用来接收master传递的RPC消息

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 接受消息
        processReceived(ctx.channel(), (Message) msg);
    }

    private void processReceived(final Channel channel, final Message msg) {
        // 省略大部分代码
        final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(messageType);
        if (pair != null) {
            Runnable r = () -> {
                try {
                    // 通过策略模式分发消息
                    pair.getLeft().process(channel, msg);
    }

WorkerTaskDispatchProcessor类来分发消息

    @Counted(value = "ds.task.execution.count", description = "task execute total count")
    @Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
    @Override
    public void process(Channel channel, Message message) {
            // 省略大部分代码
            WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable
            // 推送任务
            if (!workerManager.offer(workerTaskExecuteRunnable)) {
                log.error("submit task: {} to wait queue error, current queue size: {} is full",
                        taskExecutionContext.getTaskName(), workerManager.getWaitSubmitQueueSize());
                sendDispatchRejectResult(channel, message, taskExecutionContext);
            } 
        }
    }

WorkerManagerThread类实际处理消息

    @Override
    public void run() {
        // 省略大部分代码
        Thread.currentThread().setName("Worker-Execute-Manager-Thread");
        while (!ServerLifeCycleManager.isStopped()) {
            try {
                if (!ServerLifeCycleManager.isRunning()) {
                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                }
                if (this.getThreadPoolQueueSize() <= workerExecThreads) {
                    // 获取任务
                    final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take();
                    // 提交任务
                    workerExecService.submit(workerDelayTaskExecuteRunnable);
                } 
            } 
        }
    }

封装的WorkerTaskExecuteRunnable类的run方法

    @Override
    public void run() {
        // 省略大部分代码
        // 处理任务
        executeTask(taskCallBack);
    }
    @Override
    public void executeTask(TaskCallBack taskCallBack) throws TaskException {
        if (task == null) {
            throw new IllegalArgumentException("The task plugin instance is not initialized");
        }
        // 通过策略模式调用任务插件处理任务
        task.handle(taskCallBack);
    }

3.4、处理任务1

根据上述的执行流程分析我们知道任务是以process为调度单元,任务通过任务ID来确定先后顺序

image-20230604112340116

    @Override
    public int compareTo(Delayed o) {
        if (o == null) {
            return 1;
        }
        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
    }

我们可以将任务队列的任务改为submit/future模式,通过添加params参数确定返回值和入参,通过future.get方法来阻塞等待任务将返回值输入到下一个任务中,可以将执行结果缓存到map中,类似于

    @Override
    public Object executeTask(TaskCallBack taskCallBack) throws TaskException {
        if (task == null) {
            throw new IllegalArgumentException("The task plugin instance is not initialized");
        }
        // 通过策略模式调用任务插件处理任务
        return task.handle(taskCallBack);
    }

WorkerManagerThread类中有一个map缓存结果

ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
    @Override
    public void run() {
        // 省略大部分代码
                if (this.getThreadPoolQueueSize() <= workerExecThreads) {
                    // 获取任务
                    final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take();
                    // 判断是否需要入参
                    if (需要入参) {
                        改造workerDelayTaskExecuteRunnable
                    }
                    // 提交任务
                    future f = workerExecService.submit(workerDelayTaskExecuteRunnable);
                    f.put(ID, f.get())
                } 
            } 
        }
    }

4、规划

有丰富的后端开发经验,可以保证较快的进度, 身边的同学也同时参加了开源之夏,可以相互帮助进步 也是第一次参与开源项目,有兴趣长期投入事件参与开源事业,为他添砖加瓦

4.1、项目研发第一阶段(6月-7月):

  • 6月份大致完成任务1
  • 7月份完成任务2

4.2、项目研发第二阶段(7月-8月):

  • 完善任务1,2
  • 编写前端UI
  • 编写文档
  • 总结