服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - 一分钟掌握Java ElasticJob分布式定时任务

一分钟掌握Java ElasticJob分布式定时任务

2023-05-11 01:04未知服务器之家 Java教程

目录 前言 架构 功能和特性 入门角色 写个例子 任务执行流程 ScheduleJobBootstrap初始化 ScheduleJobBootstrap执行 执行流程总结 分片的策略 前言 ElasticJob 是面向互联网生态和海量任务的分布式调度解决方案。 它通过弹性调度、资源管控、

目录
  • 前言
  • 架构
  • 功能和特性
  • 入门角色
  • 写个例子
  • 任务执行流程
    • ScheduleJobBootstrap初始化
    • ScheduleJobBootstrap执行
    • 执行流程总结
  • 分片的策略

    前言

    ElasticJob 是面向互联网生态和海量任务的分布式调度解决方案。 它通过弹性调度、资源管控、以及任务治理的功能,打造一个适用于互联网场景的分布式调度解决方案,并通过开放的架构设计,提供多元化的任务生态。 它的各个产品使用统一的任务 API,开发者仅需一次开发,即可随意部署。

    架构

    elasticjob由两个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成组成,这是ElasticJob-Lite 的架构图:

    一分钟掌握Java ElasticJob分布式定时任务

    再创建任务配置:

    import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
    import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
    import javax.sql.DataSource;
    import java.util.Objects;
    /**
     * (这个类的说明)
     *
     * @author mars酱
     */
    public class JobConfigurationBuilder {
        public static JobConfiguration buildJobConfiguration(String jobName, String cronExpression, TracingConfiguration<DataSource> tracingConfig) {
            JobConfiguration.Builder builder = JobConfiguration.newBuilder(jobName, 3)
                    .cron(cronExpression)
                    .shardingItemParameters("0=a,1=b,2=c");
            if (Objects.nonNull(tracingConfig)) {
                builder.addExtraConfigurations(tracingConfig);
            }
            return builder.build();
        }
    }

    最后创建调度器,并执行:

    import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
    import org.apache.shardingsphere.elasticjob.lite.example.job.simple.JavaSimpleJob;
    import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
    import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
    import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
    import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
    import javax.sql.DataSource;
    /**
     * (这个类的说明)
     *
     * @author mars酱
     */
    public final class SchedulerMain {
        private static final int EMBED_ZOOKEEPER_PORT = 4181;
        private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:" + EMBED_ZOOKEEPER_PORT;
        private static final String JOB_NAMESPACE = "elasticjob-marsz-lite-java";
        // CHECKSTYLE:OFF
        public static void main(final String[] args) {
            // 内嵌zk服务
            EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT);
            CoordinatorRegistryCenter regCenter = setUpRegistryCenter();
            // 简单作业
            setUpSimpleJob(regCenter, null);
        }
        private static CoordinatorRegistryCenter setUpRegistryCenter() {
            ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);
            CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig);
            result.init();
            return result;
        }
        private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final TracingConfiguration<DataSource> tracingConfig) {
            new ScheduleJobBootstrap(regCenter,
                    new MarsSimpleJob(),
                    JobConfigurationBuilder.buildJobConfiguration("marsSimpleJob", "0/5 * * * * ?", tracingConfig)).schedule();
        }
    }

    运行的效果:

    一分钟掌握Java ElasticJob分布式定时任务

    好了,zk的部分处理完成,下面就是直接SchedulerJobBootstrap的部分了。

    ScheduleJobBootstrap初始化

    ScheduleJobBootstrap的初始化在例子中需要三个参数:

    CoordinatorRegistryCenter:这个是协调用的注册中心。是一个接口类,它的实现在ElasticJob里面只有一个ZookeeperRegisterCenter对象,未来是不是会支持其他的注册中心呢?

    ElasticJob: Mars酱理解为任务对象。但是ElasticJob这个对象本身是个空接口,有两个子接口SimpleJobDataflowJob,前者Mars酱的理解是和Quartz中的Job对象类似,只要实现execute函数就行,后者有需要实现两个接口,一个fetchData获取数据,一个processData处理数据。所以,ElasticJob这个接口留空,是为了还有其他扩展吧?

    JobConfiguration:弹性任务配置项。构建这个对象不能直接设置,只能用buider的方式构建。需要配置的属性很多,但是核心属性大致就是几个:任务名称、分片数、执行频率、分片参数。JobConfiguration的所有属性如下:

    属性名 说明
    String jobName 任务名称
    String cron cron表达式
    String timeZone 任务运行的时区
    int shardingTotalCount 任务分片总数
    String shardingItemParameters 分片序号和参数,多个键值对之间用逗号分隔,从0开始,但是不能大于或等于任务分片的总数
    String jobParameter 任务自定义任务参数
    boolean monitorExecution 是否监听执行
    boolean failover 是否启用故障转移。开启表示如果任务在一次任务执行中途宕机,允许将该次未完成的任务在另一任务节点上补偿执行
    boolean misfire 不发火。哈哈,其实是是否开启错过任务重新执行
    int maxTimeDiffSeconds 最大时差
    int reconcileIntervalMinutes 间隔时长
    String jobShardingStrategyType 任务分片策略类型,总共三种
    String jobExecutorServiceHandlerType 任务执行程序服务处理程序类型
    String jobErrorHandlerType 任务错误处理类型
    Collection jobListenerTypes 任务监听类型
    Collection extraConfigurations 附加配置信息
    String description 任务描述
    Properties props 扩展用属性值
    boolean disabled 是否禁用
    boolean overwrite 是否覆盖
    String label 标签
    boolean staticSharding 是否支持静态分片

    ScheduleJobBootstrap执行

    同样的,例子中的MarsSimpleJob的execute函数,最终会被ElasticJob框架调用,我们按照被执行的反向顺序往上找。MarsSimpleJob是继承SimpleJob的, 而SimpleJob的execute函数是被SimpleJobExecutor所调用:

    /**
     * Simple job executor.
     */
    public final class SimpleJobExecutor implements ClassedJobItemExecutor<SimpleJob> {
        @Override
        public void process(final SimpleJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
            // 这里调用execute函数
            elasticJob.execute(shardingContext);
        }
        @Override
        public Class<SimpleJob> getElasticJobClass() {
            return SimpleJob.class;
        }
    }

    再继续往上找,process的核心流程就是在ElasticJobExecutor里面了,调用process的部分在ElasticJobExcutor中几个重载的process方法调用的,两个process函数完成不同的功能,调用SimpleExecutor的process部分是这样:

    @SuppressWarnings("unchecked")
    private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
            jobFacade.postJobExecutionEvent(startEvent);
            log.trace("Job '{}' executing, item is: '{}'.", jobConfig.getJobName(), item);
            JobExecutionEvent completeEvent;
            try {
                // 这里调用SimpleJobExecutor的process
                jobItemExecutor.process(elasticJob, jobConfig, jobFacade, shardingContexts.createShardingContext(item));
                completeEvent = startEvent.executionSuccess();
                log.trace("Job '{}' executed, item is: '{}'.", jobConfig.getJobName(), item);
                jobFacade.postJobExecutionEvent(completeEvent);
                // CHECKSTYLE:OFF
            } catch (final Throwable cause) {
                // CHECKSTYLE:ON
                completeEvent = startEvent.executionFailure(ExceptionUtils.transform(cause));
                jobFacade.postJobExecutionEvent(completeEvent);
                itemErrorMessages.put(item, ExceptionUtils.transform(cause));
                JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
                jobErrorHandler.handleException(jobConfig.getJobName(), cause);
            }
    }

    上面这个process负责最终任务的执行部分,由JobItemExecutor对象调用,SimpleJobExecutor被JobItemExecutor接口定义。整个这个proces由guava包的EventBus处理消息事件,执行之前有startEvent,执行完成有completeEvent,异常也有对应的失败event,方面架构图中存盘事件日志、ELK日志收集动作。

    调用这个process的部分,由另一个process完成,长这样的:

    private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
            Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
            if (1 == items.size()) {
                int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
                JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item);
                process(jobConfig, shardingContexts, item, jobExecutionEvent);
                return;
            }
            CountDownLatch latch = new CountDownLatch(items.size());
            for (int each : items) {
                JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
                ExecutorService executorService = executorContext.get(ExecutorService.class);
                if (executorService.isShutdown()) {
                    return;
                }
                // 提交给线程池执行
                executorService.submit(() -> {
                    try {
                        process(jobConfig, shardingContexts, each, jobExecutionEvent);
                    } finally {
                        latch.countDown();
                    }
                });
            }
            try {
                latch.await();
            } catch (final InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
    }

    上面这个process负责把分片参数依次组装好,设置好JobExecutionEvent中的ip、主机名等参数,然后放入线程池中去执行。再往上,看现在这个process被调用的部分:

    private void execute(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
    	if (shardingContexts.getShardingItemParameters().isEmpty()) {
    		jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName()));
            return;
        }
        // 往注册中心注册ShardingContexts信息
        jobFacade.registerJobBegin(shardingContexts);
        String taskId = shardingContexts.getTaskId();
        // 发送跟踪日志,标记任务正在运行
        jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
        try {
            // 调用process
            process(jobConfig, shardingContexts, executionSource);
        } finally {
            // TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure
            // 告知注册中心任务完成
            jobFacade.registerJobCompleted(shardingContexts);
            if (itemErrorMessages.isEmpty()) {
                // 没有失败信息,通知任务完成
                jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
            } else {
                // 否则通知失败
                jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
                itemErrorMessages.clear();
            }
        }
    }

    方法execute从注册中心注册ShardingContext信息,并发送跟踪日志事件,然后调用process,最后发送跟踪消息标记任务完成。再有一个重载的execute方法调用上面这个execute方法,如下:

    public void execute() {
        // job的配置信息
        JobConfiguration jobConfig = jobFacade.loadJobConfiguration(true);
        executorContext.reloadIfNecessary(jobConfig);
        JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
        try {
            jobFacade.checkJobExecutionEnvironment();
        } catch (final JobExecutionEnvironmentException cause) {
            jobErrorHandler.handleException(jobConfig.getJobName(), cause);
        }
        // 这里有玄机
        ShardingContexts shardingContexts = jobFacade.getShardingContexts();
        // 发送时间消息总线
        jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName()));
        if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
            jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), 
                                              State.TASK_FINISHED, 
                                              String.format(
                    "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", 
                                                  jobConfig.getJobName(),
                                                  shardingContexts.getShardingItemParameters().keySet()));
                return;
        }
        try {
            // 任务执行的前置流程
            jobFacade.beforeJobExecuted(shardingContexts);
            //CHECKSTYLE:OFF
        } catch (final Throwable cause) {
            //CHECKSTYLE:ON
            jobErrorHandler.handleException(jobConfig.getJobName(), cause);
        }
        // 调用上面的execute方法
        execute(jobConfig, shardingContexts, ExecutionSource.NORMAL_TRIGGER);
        while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
            jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
            execute(jobConfig, shardingContexts, ExecutionSource.MISFIRE);
        }
        // 故障转移
        jobFacade.failoverIfNecessary();
        try {
            // 任务执行的后置流程
            jobFacade.afterJobExecuted(shardingContexts);
            //CHECKSTYLE:OFF
        } catch (final Throwable cause) {
            //CHECKSTYLE:ON
            jobErrorHandler.handleException(jobConfig.getJobName(), cause);
        }
    }

    这个execute就由Quartz的JobRunShell调用了,Quartz的调用的过程在 Java | 一分钟掌握定时任务 | 6 - Quartz定时任务里面还好Mars酱分析过了。

    执行流程总结

    那么,追踪完源代码,大致的流程就应该是如下:

    1.组装基本参数(任务、频率等) -> 2. ScheduleJobBootstrap初始化 -> 3.配置任务属性 -> 4.设置各种facade -> 5.初始化ElasticJobExecutor -> 6.调用scheduler执行任务 -> 7.获取任务执行器(SimpleJobExecutor) -> 8.各种校验逻辑 -> 9. 处理分片参数 -> 10. 设置任务为运行状态 -> 11. 提交任务到线程池 -> 12.执行任务 -> 13.处理任务后续逻辑

    任务的调度过程由zk完成,取决于zk的任务调度策略吧?如果一台机器的定时运行时挂了,zk会转移到另一台运行中的机器中去。-- Mars酱

    分片的策略

    任务的分片策略,用于将任务在分布式环境下分解成为任务使用。

    SPI 名称 详细说明
    JobShardingStrategy 作业分片策略接口
    已知实现类 详细说明
    AverageAllocationJobShardingStrategy 根据分片项平均分片
    OdevitySortByNameJobShardingStrategy 根据任务名称哈希值的奇偶数决定按照任务服务器 IP 升序或是降序的方式分片
    RoundRobinByNameJobShardingStrategy 根据任务名称轮询分片

    那么任务的分片策略在哪里使用的呢?就在代码中注释的“这里有玄机”那行。在getShardingContexts的方法中会调用ShardingService,它会去获取JobConfiguration中配置的分片策略方式:

    public void shardingIfNecessary() {
        List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
        if (!isNeedSharding() || availableJobInstances.isEmpty()) {
            return;
        }
        if (!leaderService.isLeaderUntilBlock()) {
            blockUntilShardingCompleted();
            return;
        }
        waitingOtherShardingItemCompleted();
        JobConfiguration jobConfig = configService.load(false);
        int shardingTotalCount = jobConfig.getShardingTotalCount();
        log.debug("Job '{}' sharding begin.", jobName);
        jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
        resetShardingInfo(shardingTotalCount);
        // 获取任务分片策略
        JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());
        jobNodeStorage.executeInTransaction(getShardingResultTransactionOperations(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
        log.debug("Job '{}' sharding complete.", jobName);
    }

    如果不设置,默认使用的是平均分片策略。

    以上就是一分钟掌握Java ElasticJob分布式定时任务的详细内容,更多关于Java ElasticJob定时任务的资料请关注其它相关文章!

    原文地址:https://juejin.cn/post/7230819482012336165

    延伸 · 阅读

    精彩推荐
    • Java教程java实现大数加法(BigDecimal)的实例代码

      java实现大数加法(BigDecimal)的实例代码

      之前写过用vector、string实现大数加法,现在用java的BigDecimal类,代码简单很多。但是在online-judge上,java的代码运行时间和内存大得多 ...

      java代码网3332019-10-16
    • Java教程mybatis plus 的动态表名的配置详解

      mybatis plus 的动态表名的配置详解

      这篇文章主要介绍了mybatis plus 的动态表名的配置详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们...

      团子是个傻子7472020-09-03
    • Java教程一篇文章教会你使用java爬取想要的资源

      一篇文章教会你使用java爬取想要的资源

      这篇文章主要介绍了使用java爬虫爬取想要的资源,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...

      Talisman丶9492021-12-02
    • Java教程Java并发编程之线程之间的共享和协作

      Java并发编程之线程之间的共享和协作

      这篇文章主要介绍了Java并发编程之线程之间的共享和协作,文中有非常详细的代码示例,对正在学习java的小伙伴们有一定的帮助,需要的朋友可以参考下...

      chentian11410162021-09-09
    • Java教程基于spring boot 1.5.4 集成 jpa+hibernate+jdbcTemplate(详解)

      基于spring boot 1.5.4 集成 jpa+hibernate+jdbcTemplate(详解)

      下面小编就为大家带来一篇基于spring boot 1.5.4 集成 jpa+hibernate+jdbcTemplate(详解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编...

      Java之家4772020-11-19
    • Java教程JAVAEE Filter 过滤器设置是否缓存实例详解

      JAVAEE Filter 过滤器设置是否缓存实例详解

      网页中,每次的客户端访问服务器,有部分不用重复请求的,这样可以减轻服务器的工作量。那么如何设置客户端是否都缓存呢?接下来通过本文给大家介...

      YX_blog3652020-05-01
    • Java教程java agent 使用及实现代码

      java agent 使用及实现代码

      java agent的作用可以在字节码这个层面对类和方法进行修改的技术,能够在不影响编译的情况下,修改字节码。本文主要给大家讲解java agent 使用及实现代码...

      木木甫7082021-05-21
    • Java教程Java使用Socket简单通讯详解

      Java使用Socket简单通讯详解

      这篇文章主要介绍了Java使用Socket简单通讯详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下...

      Aputonos5542021-11-15