目录
- 前言
- 架构
- 功能和特性
- 入门角色
- 写个例子
- 任务执行流程
- ScheduleJobBootstrap初始化
- ScheduleJobBootstrap执行
- 执行流程总结
- 分片的策略
前言
ElasticJob 是面向互联网生态和海量任务的分布式调度解决方案。 它通过弹性调度、资源管控、以及任务治理的功能,打造一个适用于互联网场景的分布式调度解决方案,并通过开放的架构设计,提供多元化的任务生态。 它的各个产品使用统一的任务 API,开发者仅需一次开发,即可随意部署。
架构
elasticjob由两个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成组成,这是ElasticJob-Lite 的架构图:
再创建任务配置:
import org.apache.shardingsphere.elasticjob.api.JobConfiguration; import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration; import javax.sql.DataSource; import java.util.Objects; /** * (这个类的说明) * * @author mars酱 */ public class JobConfigurationBuilder { public static JobConfiguration buildJobConfiguration(String jobName, String cronExpression, TracingConfiguration<DataSource> tracingConfig) { JobConfiguration.Builder builder = JobConfiguration.newBuilder(jobName, 3) .cron(cronExpression) .shardingItemParameters("0=a,1=b,2=c"); if (Objects.nonNull(tracingConfig)) { builder.addExtraConfigurations(tracingConfig); } return builder.build(); } }
最后创建调度器,并执行:
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap; import org.apache.shardingsphere.elasticjob.lite.example.job.simple.JavaSimpleJob; import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter; import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration; import javax.sql.DataSource; /** * (这个类的说明) * * @author mars酱 */ public final class SchedulerMain { private static final int EMBED_ZOOKEEPER_PORT = 4181; private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:" + EMBED_ZOOKEEPER_PORT; private static final String JOB_NAMESPACE = "elasticjob-marsz-lite-java"; // CHECKSTYLE:OFF public static void main(final String[] args) { // 内嵌zk服务 EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT); CoordinatorRegistryCenter regCenter = setUpRegistryCenter(); // 简单作业 setUpSimpleJob(regCenter, null); } private static CoordinatorRegistryCenter setUpRegistryCenter() { ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE); CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig); result.init(); return result; } private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final TracingConfiguration<DataSource> tracingConfig) { new ScheduleJobBootstrap(regCenter, new MarsSimpleJob(), JobConfigurationBuilder.buildJobConfiguration("marsSimpleJob", "0/5 * * * * ?", tracingConfig)).schedule(); } }
运行的效果:
好了,zk的部分处理完成,下面就是直接SchedulerJobBootstrap的部分了。
ScheduleJobBootstrap初始化
ScheduleJobBootstrap的初始化在例子中需要三个参数:
CoordinatorRegistryCenter
:这个是协调用的注册中心。是一个接口类,它的实现在ElasticJob里面只有一个ZookeeperRegisterCenter对象,未来是不是会支持其他的注册中心呢?
ElasticJob
: Mars酱理解为任务对象。但是ElasticJob这个对象本身是个空接口,有两个子接口SimpleJob
和DataflowJob
,前者Mars酱的理解是和Quartz中的Job对象类似,只要实现execute函数就行,后者有需要实现两个接口,一个fetchData
获取数据,一个processData
处理数据。所以,ElasticJob这个接口留空,是为了还有其他扩展吧?
JobConfiguration
:弹性任务配置项。构建这个对象不能直接设置,只能用buider的方式构建。需要配置的属性很多,但是核心属性大致就是几个:任务名称、分片数、执行频率、分片参数。JobConfiguration的所有属性如下:
属性名 | 说明 |
---|---|
String jobName | 任务名称 |
String cron | cron表达式 |
String timeZone | 任务运行的时区 |
int shardingTotalCount | 任务分片总数 |
String shardingItemParameters | 分片序号和参数,多个键值对之间用逗号分隔,从0开始,但是不能大于或等于任务分片的总数 |
String jobParameter | 任务自定义任务参数 |
boolean monitorExecution | 是否监听执行 |
boolean failover | 是否启用故障转移。开启表示如果任务在一次任务执行中途宕机,允许将该次未完成的任务在另一任务节点上补偿执行 |
boolean misfire | 不发火。哈哈,其实是是否开启错过任务重新执行 |
int maxTimeDiffSeconds | 最大时差 |
int reconcileIntervalMinutes | 间隔时长 |
String jobShardingStrategyType | 任务分片策略类型,总共三种 |
String jobExecutorServiceHandlerType | 任务执行程序服务处理程序类型 |
String jobErrorHandlerType | 任务错误处理类型 |
Collection jobListenerTypes | 任务监听类型 |
Collection extraConfigurations | 附加配置信息 |
String description | 任务描述 |
Properties props | 扩展用属性值 |
boolean disabled | 是否禁用 |
boolean overwrite | 是否覆盖 |
String label | 标签 |
boolean staticSharding | 是否支持静态分片 |
ScheduleJobBootstrap执行
同样的,例子中的MarsSimpleJob的execute函数,最终会被ElasticJob框架调用,我们按照被执行的反向顺序往上找。MarsSimpleJob是继承SimpleJob
的, 而SimpleJob
的execute函数是被SimpleJobExecutor
所调用:
/** * Simple job executor. */ public final class SimpleJobExecutor implements ClassedJobItemExecutor<SimpleJob> { @Override public void process(final SimpleJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) { // 这里调用execute函数 elasticJob.execute(shardingContext); } @Override public Class<SimpleJob> getElasticJobClass() { return SimpleJob.class; } }
再继续往上找,process的核心流程就是在ElasticJobExecutor
里面了,调用process的部分在ElasticJobExcutor
中几个重载的process方法调用的,两个process函数完成不同的功能,调用SimpleExecutor的process部分是这样:
@SuppressWarnings("unchecked") private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) { jobFacade.postJobExecutionEvent(startEvent); log.trace("Job '{}' executing, item is: '{}'.", jobConfig.getJobName(), item); JobExecutionEvent completeEvent; try { // 这里调用SimpleJobExecutor的process jobItemExecutor.process(elasticJob, jobConfig, jobFacade, shardingContexts.createShardingContext(item)); completeEvent = startEvent.executionSuccess(); log.trace("Job '{}' executed, item is: '{}'.", jobConfig.getJobName(), item); jobFacade.postJobExecutionEvent(completeEvent); // CHECKSTYLE:OFF } catch (final Throwable cause) { // CHECKSTYLE:ON completeEvent = startEvent.executionFailure(ExceptionUtils.transform(cause)); jobFacade.postJobExecutionEvent(completeEvent); itemErrorMessages.put(item, ExceptionUtils.transform(cause)); JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class); jobErrorHandler.handleException(jobConfig.getJobName(), cause); } }
上面这个process负责最终任务的执行部分,由JobItemExecutor对象调用,SimpleJobExecutor被JobItemExecutor接口定义。整个这个proces由guava包的EventBus处理消息事件,执行之前有startEvent,执行完成有completeEvent,异常也有对应的失败event,方面架构图中存盘事件日志、ELK日志收集动作。
调用这个process的部分,由另一个process完成,长这样的:
private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) { Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet(); if (1 == items.size()) { int item = shardingContexts.getShardingItemParameters().keySet().iterator().next(); JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item); process(jobConfig, shardingContexts, item, jobExecutionEvent); return; } CountDownLatch latch = new CountDownLatch(items.size()); for (int each : items) { JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each); ExecutorService executorService = executorContext.get(ExecutorService.class); if (executorService.isShutdown()) { return; } // 提交给线程池执行 executorService.submit(() -> { try { process(jobConfig, shardingContexts, each, jobExecutionEvent); } finally { latch.countDown(); } }); } try { latch.await(); } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); } }
上面这个process负责把分片参数依次组装好,设置好JobExecutionEvent中的ip、主机名等参数,然后放入线程池中去执行。再往上,看现在这个process被调用的部分:
private void execute(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) { if (shardingContexts.getShardingItemParameters().isEmpty()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName())); return; } // 往注册中心注册ShardingContexts信息 jobFacade.registerJobBegin(shardingContexts); String taskId = shardingContexts.getTaskId(); // 发送跟踪日志,标记任务正在运行 jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, ""); try { // 调用process process(jobConfig, shardingContexts, executionSource); } finally { // TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure // 告知注册中心任务完成 jobFacade.registerJobCompleted(shardingContexts); if (itemErrorMessages.isEmpty()) { // 没有失败信息,通知任务完成 jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, ""); } else { // 否则通知失败 jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString()); itemErrorMessages.clear(); } } }
方法execute从注册中心注册ShardingContext信息,并发送跟踪日志事件,然后调用process,最后发送跟踪消息标记任务完成。再有一个重载的execute方法调用上面这个execute方法,如下:
public void execute() { // job的配置信息 JobConfiguration jobConfig = jobFacade.loadJobConfiguration(true); executorContext.reloadIfNecessary(jobConfig); JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class); try { jobFacade.checkJobExecutionEnvironment(); } catch (final JobExecutionEnvironmentException cause) { jobErrorHandler.handleException(jobConfig.getJobName(), cause); } // 这里有玄机 ShardingContexts shardingContexts = jobFacade.getShardingContexts(); // 发送时间消息总线 jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName())); if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format( "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobConfig.getJobName(), shardingContexts.getShardingItemParameters().keySet())); return; } try { // 任务执行的前置流程 jobFacade.beforeJobExecuted(shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobErrorHandler.handleException(jobConfig.getJobName(), cause); } // 调用上面的execute方法 execute(jobConfig, shardingContexts, ExecutionSource.NORMAL_TRIGGER); while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) { jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet()); execute(jobConfig, shardingContexts, ExecutionSource.MISFIRE); } // 故障转移 jobFacade.failoverIfNecessary(); try { // 任务执行的后置流程 jobFacade.afterJobExecuted(shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobErrorHandler.handleException(jobConfig.getJobName(), cause); } }
这个execute就由Quartz的JobRunShell调用了,Quartz的调用的过程在 Java | 一分钟掌握定时任务 | 6 - Quartz定时任务里面还好Mars酱分析过了。
执行流程总结
那么,追踪完源代码,大致的流程就应该是如下:
1.组装基本参数(任务、频率等) -> 2. ScheduleJobBootstrap初始化 -> 3.配置任务属性 -> 4.设置各种facade -> 5.初始化ElasticJobExecutor -> 6.调用scheduler执行任务 -> 7.获取任务执行器(SimpleJobExecutor) -> 8.各种校验逻辑 -> 9. 处理分片参数 -> 10. 设置任务为运行状态 -> 11. 提交任务到线程池 -> 12.执行任务 -> 13.处理任务后续逻辑
任务的调度过程由zk完成,取决于zk的任务调度策略吧?如果一台机器的定时运行时挂了,zk会转移到另一台运行中的机器中去。-- Mars酱
分片的策略
任务的分片策略,用于将任务在分布式环境下分解成为任务使用。
SPI 名称 | 详细说明 |
---|---|
JobShardingStrategy | 作业分片策略接口 |
已知实现类 | 详细说明 |
---|---|
AverageAllocationJobShardingStrategy | 根据分片项平均分片 |
OdevitySortByNameJobShardingStrategy | 根据任务名称哈希值的奇偶数决定按照任务服务器 IP 升序或是降序的方式分片 |
RoundRobinByNameJobShardingStrategy | 根据任务名称轮询分片 |
那么任务的分片策略在哪里使用的呢?就在代码中注释的“这里有玄机”那行。在getShardingContexts的方法中会调用ShardingService,它会去获取JobConfiguration
中配置的分片策略方式:
public void shardingIfNecessary() { List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances(); if (!isNeedSharding() || availableJobInstances.isEmpty()) { return; } if (!leaderService.isLeaderUntilBlock()) { blockUntilShardingCompleted(); return; } waitingOtherShardingItemCompleted(); JobConfiguration jobConfig = configService.load(false); int shardingTotalCount = jobConfig.getShardingTotalCount(); log.debug("Job '{}' sharding begin.", jobName); jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, ""); resetShardingInfo(shardingTotalCount); // 获取任务分片策略 JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType()); jobNodeStorage.executeInTransaction(getShardingResultTransactionOperations(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount))); log.debug("Job '{}' sharding complete.", jobName); }
如果不设置,默认使用的是平均分片策略。
以上就是一分钟掌握Java ElasticJob分布式定时任务的详细内容,更多关于Java ElasticJob定时任务的资料请关注其它相关文章!
原文地址:https://juejin.cn/post/7230819482012336165