Quartz(3-Scheduler) | 总字数: 3.4k | 阅读时长: 17分钟 | 浏览量: |
BaseCalendar
专门用于屏蔽一个时间区间,使 Trigger 在这个区间中不被触发
AnnualCalendar
排除每一年中指定的一天或多少天,精度是天
CronCalendar
使用表达式排除某些时间段不执行,精度取决于 Cron 表达式,最大精度到秒
DailyCalendar
指定的时间范围内的每一天不执行,指定每天的时间段,格式是 HH: MM [: SS[mmm]],即最大精度可以到毫秒
HolidayCalendar
排除节假日,精度到天
MonthlyCalendar
排除月份中的数天,可选值为 1-31,精度是天
WeeklyCalendar
排除星期中的一天或多天,可选值比如为:java.util.Calendar.SUNDAY,精度是天
基本使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class CalendarSchedule { public static void main (String[] args) throws SchedulerException { JobDetail jobDetail = JobBuilder.newJob(HelloJob.class) .withIdentity("test" ) .build(); AnnualCalendar holidays = new AnnualCalendar (); GregorianCalendar nationalDay = new GregorianCalendar (2017 , 10 , 27 ); holidays.setDayExcluded(nationalDay,true ); Trigger simpleTrigger = TriggerBuilder.newTrigger() .withIdentity("testTrigger" ) .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(1 ) .repeatForever()) .modifiedByCalendar("holidays" ) .build(); StdSchedulerFactory stdSchedulerFactory = new StdSchedulerFactory (); Scheduler scheduler = stdSchedulerFactory.getScheduler(); scheduler.addCalendar("holidays" ,holidays,false ,false ); scheduler.scheduleJob(jobDetail,simpleTrigger); scheduler.start(); } }
定时任务增删改查
参考 Scheduler 源码中的方法
通过配置文件定义每个任务的属性,项目启动后将每个任务加载到内存中,将其中一个任务用来监听配置文件的修改,当其修改后动态刷新任务
1 2 3 4 5 6 7 8 quartz: jobs: - name: myName group: collect cron: 0 0 /5 * * * ? * jobClass: com.gamer.me.quartz.jobs.MyJob desc: 我的任务
1 2 3 4 5 6 7 8 9 @Data public class SchedulerJob { private String name; private String group; private String cron; private String jobClass; private String desc; }
1 2 3 4 5 6 7 @Data @Component @ConfigurationProperties(prefix = "quartz") public class SchedulerJobs { private List<SchedulerJob> jobs; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 @Slf4j @Component public class SchedulerManager { @Autowired private Scheduler scheduler; public void activeJob (SchedulerJob schedulerJob) { JobKey jobKey = JobKey.jobKey(schedulerJob.getName(), schedulerJob.getGroup()); try { if (scheduler.checkExists(jobKey) && !MonitorCronJob.JOB_NAME.equals(schedulerJob.getName())) { updateJob(schedulerJob); }else { createJob(schedulerJob); } } catch (SchedulerException e) { logger.error("activeJob {}" , e); } } public void createJob (SchedulerJob schedulerJob) { JobKey jobKey = JobKey.jobKey(schedulerJob.getName(), schedulerJob.getGroup()); try { if (scheduler.checkExists(jobKey)) { return ; } Class<?> clazz = Class.forName(schedulerJob.getJobClass()); JobDetail jobDetail = getJobDetail(schedulerJob, (Class<Job>) clazz); Trigger cronTrigger = getCronTrigger(schedulerJob); scheduler.scheduleJob(jobDetail, cronTrigger); } catch (ClassNotFoundException | SchedulerException e) { logger.error("createJob {}" , e); } } public void updateJob (SchedulerJob schedulerJob) { TriggerKey triggerKey = TriggerKey.triggerKey(schedulerJob.getName(), schedulerJob.getGroup()); try { Trigger trigger = scheduler.getTrigger(triggerKey); if (trigger == null ) { return ; } JobKey jobKey = trigger.getJobKey(); String oldCron = ((CronTrigger)trigger).getCronExpression(); if (oldCron.equals(schedulerJob.getCron())){ return ; } Trigger cronTrigger = getCronTrigger(schedulerJob); scheduler.rescheduleJob(triggerKey, cronTrigger); } catch (SchedulerException e) { logger.error("updateJob {}" , e); } } public void deleteJobs (List<JobKey> jobKeys) { try { scheduler.deleteJobs(jobKeys); } catch (SchedulerException e) { logger.error("deleteJobs {}" , e); } } private JobDetail getJobDetail (SchedulerJob schedulerJob, Class<Job> clazz) { return JobBuilder.newJob() .ofType(clazz) .withIdentity(schedulerJob.getName(), schedulerJob.getGroup()) .withDescription(schedulerJob.getDesc()) .build(); } private Trigger getCronTrigger (SchedulerJob schedulerJob) { CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(schedulerJob.getCron()); if (!MonitorCronJob.JOB_NAME.equals(schedulerJob.getName())){ cronScheduleBuilder.withMisfireHandlingInstructionIgnoreMisfires(); } return TriggerBuilder.newTrigger() .withIdentity(schedulerJob.getName(), schedulerJob.getGroup()) .withDescription(schedulerJob.getDesc()) .withSchedule(cronScheduleBuilder) .build(); } }
监控其他定时任务的总任务 MonitorCronJob(用于监控 cron 的更新)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Slf4j public class MonitorCronJob implements Job { public static final String JOB_NAME = "monitor_cron" ; public static final String GROUP_NAME = "monitor" ; public static final String CRON = "0 0/10 * * * ?" ; public static final String DESC = "监控cron更新" ; @Autowired private SchedulerManager schedulerManager; @Autowired private SchedulerJobs schedulerJobs; @Autowired private ContextRefresher contextRefresher; @Override public void execute (JobExecutionContext jobExecutionContext) throws JobExecutionException { contextRefresher.refresh(); Set<JobKey> oldJobKeys = null ; try { oldJobKeys = jobExecutionContext.getScheduler().getJobKeys(GroupMatcher.anyJobGroup()); } catch (SchedulerException e) { logger.error("MonitorCronJob {}" , e); } List<String> newJobKeys = new ArrayList <>(); for (SchedulerJob job : schedulerJobs.getJobs()) { if (job.getName().equals(JOB_NAME)) { continue ; } newJobKeys.add(job.getName()); logger.info("job【{}】,cron【{}】" , job.getName(), job.getCron()); schedulerManager.activeJob(job); } if (oldJobKeys == null ) { return ; } List<JobKey> shouldDeleteJobKeys = oldJobKeys.stream() .filter(jobKey -> !JOB_NAME.equals(jobKey.getName()) && !newJobKeys.contains(jobKey.getName())) .collect(Collectors.toList()); logger.info("delete jobs {}" , shouldDeleteJobKeys); schedulerManager.deleteJobs(shouldDeleteJobKeys); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Component public class Initialization implements ApplicationRunner { @Autowired private SchedulerManager schedulerManager; @Override public void run (ApplicationArguments args) throws Exception { SchedulerJob schedulerJob = new SchedulerJob (); schedulerJob.setName(MonitorCronJob.JOB_NAME); schedulerJob.setGroup(MonitorCronJob.GROUP_NAME); schedulerJob.setCron(MonitorCronJob.CRON); schedulerJob.setDesc(MonitorCronJob.DESC); schedulerJob.setJobClass(MonitorCronJob.class.getName()); schedulerManager.activeJob(schedulerJob); } }
SchedulerRepository 详解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class SchedulerRepository { private HashMap<String, Scheduler> schedulers; private static SchedulerRepository inst; private SchedulerRepository () { schedulers = new HashMap <String, Scheduler>(); } public static synchronized SchedulerRepository getInstance () { if (inst == null ) { inst = new SchedulerRepository (); } return inst; } public synchronized void bind (Scheduler sched) throws SchedulerException { if ((Scheduler) schedulers.get(sched.getSchedulerName()) != null ) { throw new SchedulerException ("Scheduler with name '" + sched.getSchedulerName() + "' already exists." ); } schedulers.put(sched.getSchedulerName(), sched); } public synchronized boolean remove (String schedName) { return (schedulers.remove(schedName) != null ); } public synchronized Scheduler lookup (String schedName) { return schedulers.get(schedName); } public synchronized Collection<Scheduler> lookupAll () { return java.util.Collections.unmodifiableCollection(schedulers.values()); } }
StdSchedulerFactory 详解
SchedulerFacotory 是一个接口,它有两个实现:
StdSchedulerFacotory 通过配置文件来设置 Scheduler 的各项参数
DirectSchedulerFactory 主要通过硬编码来设置 Scheduler 的各项参数
构造方法
1 2 3 4 5 6 7 8 9 10 public StdSchedulerFactory () {} public StdSchedulerFactory (Properties props) throws SchedulerException { initialize(props); } public StdSchedulerFactory (String fileName) throws SchedulerException { initialize(fileName); }
getScheduler()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public Scheduler getScheduler () throws SchedulerException { if (cfg == null ) { initialize(); } SchedulerRepository schedRep = SchedulerRepository.getInstance(); Scheduler sched = schedRep.lookup(getSchedulerName()); if (sched != null ) { if (sched.isShutdown()) { schedRep.remove(getSchedulerName()); } else { return sched; } } sched = instantiate(); return sched; }
getScheduler()会推迟实例化,当调用其方法时才会去初始化 Quartz 的配置
initialize()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 public void initialize () throws SchedulerException { if (cfg != null ) { return ; } if (initException != null ) { throw initException; } String requestedFile = System.getProperty(PROPERTIES_FILE); String propFileName = requestedFile != null ? requestedFile: "quartz.properties" ; File propFile = new File (propFileName); Properties props = new Properties (); InputStream in = null ; try { if (propFile.exists()) { try { if (requestedFile != null ) { propSrc = "specified file: '" + requestedFile + "'" ; } else { propSrc = "default file in current working dir: 'quartz.properties'" ; } in = new BufferedInputStream (new FileInputStream (propFileName)); props.load(in); } catch (IOException ioe) { initException = new SchedulerException ("Properties file: '" + propFileName + "' could not be read." , ioe); throw initException; } } else if (requestedFile != null ) { in = Thread.currentThread().getContextClassLoader().getResourceAsStream(requestedFile); if (in == null ) { initException = new SchedulerException ("Properties file: '" + requestedFile + "' could not be found." ); throw initException; } propSrc = "specified file: '" + requestedFile + "' in the class resource path." ; in = new BufferedInputStream (in); try { props.load(in); } catch (IOException ioe) { initException = new SchedulerException ("Properties file: '" + requestedFile + "' could not be read." , ioe); throw initException; } } else { propSrc = "default resource file in Quartz package: 'quartz.properties'" ; ClassLoader cl = getClass().getClassLoader(); if (cl == null ) cl = findClassloader(); if (cl == null ) throw new SchedulerConfigException ("Unable to find a class loader on the current thread or class." ); in = cl.getResourceAsStream("quartz.properties" ); if (in == null ) { in = cl.getResourceAsStream("/quartz.properties" ); } if (in == null ) { in = cl.getResourceAsStream("org/quartz/quartz.properties" ); } if (in == null ) { initException = new SchedulerException ( "Default quartz.properties not found in class path" ); throw initException; } try { props.load(in); } catch (IOException ioe) { initException = new SchedulerException ( "Resource properties file: 'org/quartz/quartz.properties' " + "could not be read from the classpath." , ioe); throw initException; } } } finally { if (in != null ) { try { in.close(); } catch (IOException ignore) { } } } initialize(overrideWithSysProps(props)); } private Properties overrideWithSysProps (Properties props) { Properties sysProps = null ; try { sysProps = System.getProperties(); } catch (AccessControlException e) { getLog().warn( "Skipping overriding quartz properties with System properties " + "during initialization because of an AccessControlException. " + "This is likely due to not having read/write access for " + "java.util.PropertyPermission as required by java.lang.System.getProperties(). " + "To resolve this warning, either add this permission to your policy file or " + "use a non-default version of initialize()." , e); } if (sysProps != null ) { props.putAll(sysProps); } return props; } public void initialize (Properties props) throws SchedulerException { if (propSrc == null ) { propSrc = "an externally provided properties instance." ; } this .cfg = new PropertiesParser (props); }
通过查看源码可知,quartz 的配置文件加载流程如下:
查看是否有系统变量指定了配置文件路径
在当前项目下面查找名为 quartz.properties 的文件
通过类加载器加载当前 classpath 路径下的 quartz.properties、/quartz.properties、org/quartz/quartz.properties 的资源文件
这里需要注意的是,一旦自己指定了 quartz.properties 文件,那么 quartz 默认提供的配置将不会被加载,因此也就不会生效
Quartz 默认配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 org.quartz.scheduler.instanceName : DefaultQuartzScheduler org.quartz.scheduler.rmi.export : false org.quartz.scheduler.rmi.proxy : false org.quartz.scheduler.wrapJobExecutionInUserTransaction : false org.quartz.threadPool.class : org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount : 10 org.quartz.threadPool.threadPriority : 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread : true org.quartz.jobStore.misfireThreshold : 60000 org.quartz.jobStore.class : org.quartz.simpl.RAMJobStore
instantiate()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 public class StdSchedulerFactory implements SchedulerFactory { public static final String PROPERTIES_FILE = "org.quartz.properties" ; public static final String PROP_SCHED_INSTANCE_NAME = "org.quartz.scheduler.instanceName" ; public static final String PROP_SCHED_INSTANCE_ID = "org.quartz.scheduler.instanceId" ; public static final String PROP_SCHED_INSTANCE_ID_GENERATOR_PREFIX = "org.quartz.scheduler.instanceIdGenerator" ; public static final String PROP_SCHED_INSTANCE_ID_GENERATOR_CLASS = PROP_SCHED_INSTANCE_ID_GENERATOR_PREFIX + ".class" ; public static final String PROP_SCHED_THREAD_NAME = "org.quartz.scheduler.threadName" ; public static final String PROP_SCHED_BATCH_TIME_WINDOW = "org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow" ; public static final String PROP_SCHED_MAX_BATCH_SIZE = "org.quartz.scheduler.batchTriggerAcquisitionMaxCount" ; public static final String PROP_SCHED_JMX_EXPORT = "org.quartz.scheduler.jmx.export" ; public static final String PROP_SCHED_JMX_OBJECT_NAME = "org.quartz.scheduler.jmx.objectName" ; public static final String PROP_SCHED_JMX_PROXY = "org.quartz.scheduler.jmx.proxy" ; public static final String PROP_SCHED_JMX_PROXY_CLASS = "org.quartz.scheduler.jmx.proxy.class" ; public static final String PROP_SCHED_RMI_EXPORT = "org.quartz.scheduler.rmi.export" ; public static final String PROP_SCHED_RMI_PROXY = "org.quartz.scheduler.rmi.proxy" ; public static final String PROP_SCHED_RMI_HOST = "org.quartz.scheduler.rmi.registryHost" ; public static final String PROP_SCHED_RMI_PORT = "org.quartz.scheduler.rmi.registryPort" ; public static final String PROP_SCHED_RMI_SERVER_PORT = "org.quartz.scheduler.rmi.serverPort" ; public static final String PROP_SCHED_RMI_CREATE_REGISTRY = "org.quartz.scheduler.rmi.createRegistry" ; public static final String PROP_SCHED_RMI_BIND_NAME = "org.quartz.scheduler.rmi.bindName" ; public static final String PROP_SCHED_WRAP_JOB_IN_USER_TX = "org.quartz.scheduler.wrapJobExecutionInUserTransaction" ; public static final String PROP_SCHED_USER_TX_URL = "org.quartz.scheduler.userTransactionURL" ; public static final String PROP_SCHED_IDLE_WAIT_TIME = "org.quartz.scheduler.idleWaitTime" ; public static final String PROP_SCHED_DB_FAILURE_RETRY_INTERVAL = "org.quartz.scheduler.dbFailureRetryInterval" ; public static final String PROP_SCHED_MAKE_SCHEDULER_THREAD_DAEMON = "org.quartz.scheduler.makeSchedulerThreadDaemon" ; public static final String PROP_SCHED_SCHEDULER_THREADS_INHERIT_CONTEXT_CLASS_LOADER_OF_INITIALIZING_THREAD = "org.quartz.scheduler.threadsInheritContextClassLoaderOfInitializer" ; public static final String PROP_SCHED_CLASS_LOAD_HELPER_CLASS = "org.quartz.scheduler.classLoadHelper.class" ; public static final String PROP_SCHED_JOB_FACTORY_CLASS = "org.quartz.scheduler.jobFactory.class" ; public static final String PROP_SCHED_JOB_FACTORY_PREFIX = "org.quartz.scheduler.jobFactory" ; public static final String PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN = "org.quartz.scheduler.interruptJobsOnShutdown" ; public static final String PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN_WITH_WAIT = "org.quartz.scheduler.interruptJobsOnShutdownWithWait" ; public static final String PROP_SCHED_CONTEXT_PREFIX = "org.quartz.context.key" ; public static final String PROP_THREAD_POOL_PREFIX = "org.quartz.threadPool" ; public static final String PROP_THREAD_POOL_CLASS = "org.quartz.threadPool.class" ; public static final String PROP_JOB_STORE_PREFIX = "org.quartz.jobStore" ; public static final String PROP_JOB_STORE_LOCK_HANDLER_PREFIX = PROP_JOB_STORE_PREFIX + ".lockHandler" ; public static final String PROP_JOB_STORE_LOCK_HANDLER_CLASS = PROP_JOB_STORE_LOCK_HANDLER_PREFIX + ".class" ; public static final String PROP_TABLE_PREFIX = "tablePrefix" ; public static final String PROP_SCHED_NAME = "schedName" ; public static final String PROP_JOB_STORE_CLASS = "org.quartz.jobStore.class" ; public static final String PROP_JOB_STORE_USE_PROP = "org.quartz.jobStore.useProperties" ; public static final String PROP_DATASOURCE_PREFIX = "org.quartz.dataSource" ; public static final String PROP_CONNECTION_PROVIDER_CLASS = "connectionProvider.class" ; @Deprecated public static final String PROP_DATASOURCE_DRIVER = "driver" ; @Deprecated public static final String PROP_DATASOURCE_URL = "URL" ; @Deprecated public static final String PROP_DATASOURCE_USER = "user" ; @Deprecated public static final String PROP_DATASOURCE_PASSWORD = "password" ; @Deprecated public static final String PROP_DATASOURCE_MAX_CONNECTIONS = "maxConnections" ; @Deprecated public static final String PROP_DATASOURCE_VALIDATION_QUERY = "validationQuery" ; public static final String PROP_DATASOURCE_JNDI_URL = "jndiURL" ; public static final String PROP_DATASOURCE_JNDI_ALWAYS_LOOKUP = "jndiAlwaysLookup" ; public static final String PROP_DATASOURCE_JNDI_INITIAL = "java.naming.factory.initial" ; public static final String PROP_DATASOURCE_JNDI_PROVDER = "java.naming.provider.url" ; public static final String PROP_DATASOURCE_JNDI_PRINCIPAL = "java.naming.security.principal" ; public static final String PROP_DATASOURCE_JNDI_CREDENTIALS = "java.naming.security.credentials" ; public static final String PROP_PLUGIN_PREFIX = "org.quartz.plugin" ; public static final String PROP_PLUGIN_CLASS = "class" ; public static final String PROP_JOB_LISTENER_PREFIX = "org.quartz.jobListener" ; public static final String PROP_TRIGGER_LISTENER_PREFIX = "org.quartz.triggerListener" ; public static final String PROP_LISTENER_CLASS = "class" ; public static final String DEFAULT_INSTANCE_ID = "NON_CLUSTERED" ; public static final String AUTO_GENERATE_INSTANCE_ID = "AUTO" ; public static final String PROP_THREAD_EXECUTOR = "org.quartz.threadExecutor" ; public static final String PROP_THREAD_EXECUTOR_CLASS = "org.quartz.threadExecutor.class" ; public static final String SYSTEM_PROPERTY_AS_INSTANCE_ID = "SYS_PROP" ; public static final String MANAGEMENT_REST_SERVICE_ENABLED = "org.quartz.managementRESTService.enabled" ; public static final String MANAGEMENT_REST_SERVICE_HOST_PORT = "org.quartz.managementRESTService.bind" ; }
instantiate()方法主要就是根据之前加载好的配置文件来创建出 scheduler 需要用到的一些对象。
主要对象:
Job:业务逻辑类需要时间的接口
JobStore:主要用于存储 Job 和 Trigger
JobFactory:job 实例化工厂
ThreadPool:主要用于分配任务给具体的线程进行执行
QuartzSchedulerThread:管理者线程
WorkThread:工作者线程
DBConnectionManager:数据库连接管理器
ThreadExecutor:线程执行器
SchedulerPlugin:调度器插件
JobListener:任务监听器
Trigger:触发器
Schedule:时间表
TriggerListerne:触发器监听器
JobRunShellFactory:jobRunShell 的工厂
JobRunShell:实现了 Runnale 接口,会调用对应实例化的 job 的 execute 方法