[自研开源] MyData 数据集成任务的流程介绍 v0.7.1

299 天前
 lien321

开源地址:gitee | github

详细介绍:MyData 基于 Web API 的数据集成平台 v0.7.0

部署文档:[用 Docker 部署 MyData v0.7.1]( https://www.mydata.work/docs#/./docker/用 Docker 部署 MyData)

使用手册:MyData 使用手册 v0.7.1

交流 Q 群:430089673

MyData 后端结构

MyData 的后端由 3 个子服务组成,分别是管理服务任务服务业务数据服务

依赖的组件:

下图从数据流角度 展示 3 个子服务的关联: 注:开源版本采用单体 SpringBoot ;

任务服务

配置任务

任务主要包括:项目环境、数据标准、应用 API 、任务类型、字段映射、任务周期;

任务流程

数据集成的任务执行流程如下图:

  1. 任务服务启动时(即 MyData 系统启动),查询所有运行状态的任务;

    public class JobExecutor implements ApplicationRunner {
        ...
    
        @Override
        public void run(ApplicationArguments args) {
            // 移除已有缓存
            jobCache.removeAll();
    
            // 查询已启动的任务
            List<Task> tasks = taskService.listRunningTasks();
            log.info("tasks.size() = " + tasks.size());
            if (CollUtil.isNotEmpty(tasks)) {
                tasks.forEach(this::startTask);
            }
        }
    
        ...
    }
    
  2. 根据任务的 cron 表达式,计算任务的下次执行时间;

    /**
     * 根据 任务的上次执行时间 和 设定间隔规则,计算任务的 下次执行时间
     *
     * @param taskInfo 定时任务
     */
    private void calculateNextRunTime(TaskInfo taskInfo) {
        Assert.notNull(taskInfo);
        Assert.notEmpty(taskInfo.getTaskPeriod());
    
        Date date = taskInfo.getStartTime();
        if (taskInfo.getFailCount() > 0) {
            date = taskInfo.getNextRunTime();
        }
    
        CronExpression cronExpression = new CronExpression(taskInfo.getTaskPeriod());
        Date nextRunTime = cronExpression.getNextValidTimeAfter(date);
        taskInfo.setNextRunTime(nextRunTime);
    }
    
  3. 计算任务的下次执行时间 与 当前时间的时间差,以时间差作为缓存失效期 将任务存入 redis 缓存;

    /**
     * 缓存任务
     *
     * @param taskInfo 任务对象
     * @throws IllegalArgumentException 缓存时长无效
     */
    public void cacheJob(TaskInfo taskInfo) throws IllegalArgumentException {
        // 计算任务缓存有效时长
        long expire = DateUtil.between(taskInfo.getStartTime(), taskInfo.getNextRunTime(), DateUnit.SECOND);
        if (expire <= 0) {
            throw new IllegalArgumentException(StrUtil.format("expire <= 0, startTime = {}, nextRunTime = {}"
                    , DateUtil.format(taskInfo.getStartTime(), DatePattern.NORM_DATETIME_MS_PATTERN)
                    , DateUtil.format(taskInfo.getNextRunTime(), DatePattern.NORM_DATETIME_MS_PATTERN)));
        }
    
        redisUtil.set(CACHE_TASK + taskInfo.getId(), taskInfo);
        redisUtil.set(CACHE_JOB + taskInfo.getId(), taskInfo.getId(), expire);
        taskInfo.appendLog("任务存入 redis ,缓存时长 {} 秒", expire);
    }
    
  4. 通过监听 redis 的 key 失效事件,获得待执行的任务;

    public class RedisKeyExpiredListener implements MessageListener {
    
        private final JobExecutor jobExecutor;
    
        @Override
        public void onMessage(Message message, byte[] pattern) {
            String expiredKey = message.toString();
            if (StrUtil.startWith(expiredKey, JobCache.CACHE_JOB)) {
                String taskId = StrUtil.subSuf(expiredKey, JobCache.CACHE_JOB.length());
                jobExecutor.notify(taskId);
            }
        }
    }
    
  5. 将任务加入待执行的线程池,随后即可执行

    /**
     * 任务存入执行队列
     *
     * @param taskInfo 任务
     */
    private void executeJob(TaskInfo taskInfo) {
        taskInfo.appendLog("任务存入执行队列");
        Runnable runnable = new JobThread(taskInfo);
        getThreadPoolExecutor().execute(runnable);
    }
    
  6. 根据任务类型分别执行提供数据消费数据流程;

    1. 提供数据

      1. 调用应用 API ,获取 json 格式数据;
      2. 根据任务中字段映射 解析 json 为业务数据 Map 集合;
      3. 调用数据服务 将业务数据存入 MongoDB ;
      case MdConstant.DATA_PRODUCER:
          // 调用 api 获取 json
          String json = ApiUtil.read(taskInfo);
          // 将 json 按字段映射 解析为业务数据
          jobDataService.parseData(taskInfo, json);
          // 根据条件过滤数据
          jobDataFilterService.doFilter(taskInfo);
          // 保存业务数据
          jobDataService.saveTaskData(taskInfo);
          // 更新环境变量
          jobVarService.saveVarValue(taskInfo, json);
      
          break;
      
    2. 消费数据

      1. 根据任务所选数据标准,查询业务数据;
      2. 再根据字段映射,将业务数据 转为指定的 json 对象集合;
      3. 调用应用 API ,传输 json 数据;
      case MdConstant.DATA_CONSUMER:
          List<BizDataFilter> filters = taskInfo.getDataFilters();
          if (CollUtil.isNotEmpty(filters)) {
              // 解析过滤条件值中的 自定义字符串
              parseFilterValue(filters);
              // 排除值为 null 的条件
              filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());
          }
          // 根据过滤条件 查询数据
          String dataCode = taskInfo.getDataCode();
          if (StrUtil.isNotEmpty(dataCode)) {
              List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters);
              taskInfo.setConsumeDataList(dataList);
              // 根据字段映射转换为 api 参数
              jobDataService.convertData(taskInfo);
          }
          // 调用 api 传输数据
          ApiUtil.write(taskInfo);
          break;
      
  7. 保存任务执行日志;

1233 次点击
所在节点    分享创造
0 条回复

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1021975

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX