说明一下schedule-fom的常见使用场景和使用方式
<dependency> <groupId>com.cowave.commons</groupId> <artifactId>schedule-fom</artifactId> <version>2.7.3</version> </dependency> <!-— 需要依赖一下micrometer-core,版本根据使用的spring版本选择 —-> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-core</artifactId> </dependency>
1. 使用方式
需要通过 @EnableFom 进行声明,schedule-fom依赖于spring应用上下文
Main.java 1 2 3 4 5 6 7 8 @EnableFom (enableFomView=false )@Configuration public class Main { @SuppressWarnings ("resource" ) public static void main (String[] args) { new AnnotationConfigApplicationContext("org.springframework.fom.test" ); } }
比如定义一个简单的定时任务
SimpleSchedule.java 1 2 3 4 5 6 7 8 @Fom (cron = "0 */1 * * * ?" )public class SimpleSchedule { @Schedule public void schedule () { } }
2. 定时场景 2.1. 单任务 SingleSchedule 单个定时任务比较简单,这里演示一下 ScheduleService,可以在任务过程中对配置的获取和修改,配置项 config.email 可以在界面进行修改和注入,在任务的执行过程中,也可以手动新增修改配置项config.sleep.time,然后在管理界面上查看和修改
SingleSchedule.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Fom (cron = "0 */1 * * * ?" , remark = "定时单任务" )public class SingleSchedule { private static final Logger LOG = LoggerFactory.getLogger(SingleSchedule.class ) ; private final Random random = new Random(); @Value ("${config.email:shanhm1991@163.com}" ) private String email; @Autowired private ScheduleService scheduleService; @Schedule public long exec () throws InterruptedException { Integer sleepTime = scheduleService.getCurrentConfig("config.sleep.time" ); if (sleepTime != null ){ LOG.info("executing sleep..., email={}" , email); Thread.sleep(sleepTime); } sleepTime = random.nextInt(5000 ); scheduleService.putCurrentConfig("config.sleep.time" , sleepTime); scheduleService.serializeCurrent(); return sleepTime; } }
2.2. 多任务 MultiSchedule @Fom 标识的类,表示一个调度计划,使用 @Schedule 标识其中的方法,表示以这个方法作为任务来执行。可以标识多个方法,然后这些任务共用同一个调度计划
MultiSchedule.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Fom (fixedRate = 120000 , threadCore = 2 , remark = "定时多任务" )public class MultiSchedule { private static final Logger LOG = LoggerFactory.getLogger(MultiSchedule.class ) ; private final Random random = new Random(); @Schedule public long task1 () throws InterruptedException { long sleep = random.nextInt(5000 ); LOG.info("task executing ..." ); Thread.sleep(sleep); return sleep; } @Schedule public String task2 () throws InterruptedException { LOG.info("task executing ..." ); Thread.sleep(random.nextInt(5000 )); return "test" ; } }
2.3. 批任务 BatchSchedule 实现接口 ScheduleFactory,可以用集合的方式来提交批任务,但是对于任务的抽象需要继承给定的FomTask
BatchSchedule.java 1 2 3 4 5 6 7 8 9 10 11 12 @Fom (fixedDelay = 180000 , threadCore = 4 , remark = "定时批任务" )public class BatchSchedule implements ScheduleFactory <Long > { @Override public List<BatchTask> newScheduleTasks () throws Exception { List<BatchTask> list = new ArrayList<>(); for (int i = 1 ; i <= 10 ; i++){ list.add(new BatchTask(i)); } return list; } }
2.4. 任务结束事件 CompleteSchedule 实现接口 CompleteHandler,可以在批任务结束时,执行一些自定义处理,在处理调用中可以拿到执行次数、时间、结果等信息
CompleteSchedule.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Fom (cron = "0 0/5 * * * ?" , threadCore = 4 , remark = "自定义任务结束处理" )public class CompleteSchedule implements ScheduleFactory <Long >, CompleteHandler <Long > { private static final Logger LOG = LoggerFactory.getLogger(CompleteSchedule.class ) ; @Override public List<CompleteTask> newScheduleTasks () throws Exception { List<CompleteTask> list = new ArrayList<>(); for (int i = 1 ; i <= 10 ; i++){ list.add(new CompleteTask()); } return list; } @Override public void onComplete (long times, long lastTime, List<Result<Long>> results) throws Exception { String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" ).format(lastExecTime); LOG.info( "onComplete:第{}次在{}提交的任务全部完成,结果省略..." , times, date); } }
2.5. 任务超时检测 TimeoutSchedule 通过 taskOverTime 可以给任务指定超时时间,这样当任务超时后调度线程会尝试对其进行中断取消。默认是对每个任务单独进行超时检测,如果想对整体任务进行超时检测,可以设置detectTimeoutOnEachTask,这样将从第一个任务开始执行时计算时间,等到超时后就取消还没结束的任务,不管这个任务实际耗时多久,就算它还没有开始执行也会被取消
TimeoutSchedule.java 1 2 3 4 5 6 7 8 9 10 11 12 @Fom (cron = "0 0/7 * * * ?" , threadCore = 4 , taskOverTime = 3500 , remark = "任务超时检测" )public class TimeoutSchedule implements ScheduleFactory <Long > { @Override public List<TimeoutTask> newScheduleTasks () throws Exception { List<TimeoutTask> list = new ArrayList<>(); for (int i = 1 ; i <= 15 ; i++){ list.add(new TimeoutTask()); } return list; } }
2.6. 任务取消事件 CancelSchedule 实现接口 TaskCancelHandler 可以自定义任务的取消策略,取消调用有两个时机:检测到任务超时,或者被外部shutdown关闭。如果想自定义shutdown之后的处理,比如释放一些资源,也可以实现接口TerminateHandler(有的场景中,可能要取消的任务无法响应中断,比如等待套接字返回,或者无限循环,此时只能通过关闭连接,或增加循环标志等办法来结束)
CancelSchedule.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Fom (cron = "0 0/11 * * * ?" , execOnLoad = true , taskOverTime = 300000 , remark = "自定义任务取消事件" )public class CancelSchedule implements TaskCancelHandler , TerminateHandler { private static final Logger LOG = LoggerFactory.getLogger(CancelSchedule.class ) ; private volatile boolean off = false ; @Schedule public void exec () { LOG.info("task executing ..." ); while (!off){ try { Thread.sleep(60000 ); } catch (InterruptedException e) { } } } @Override public void handleCancel (String taskId, long costTime) { off = true ; LOG.info("handleCancel:任务被取消{},已经耗时{}ms" , taskId, costTime); } @Override public void onTerminate (long execTimes, long lastExecTime) { String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" ).format(lastExecTime); LOG.info("onTerminate:任务关闭,共执行{}次任务,最后一次执行时间为{}" , execTimes, date); } }
对于定义的FomTask,如果实现了接口TaskCancelHandler,同样也会在超时或者 shutdown 时进行调用
TimeoutTask.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class TimeoutTask extends FomTask <Long > implements TaskCancelHandler { @Override public Long exec () throws InterruptedException { long sleep = new Random().nextInt(10000 ); logger.info("task executing ..." ); Thread.sleep(sleep); return sleep; } @Override public void handleCancel (String taskId, long costTime) throws Exception { logger.info("task handleCancel:取消任务[{}],耗时={}ms" , taskId, costTime); } }
3. 非定时场景 也可以将 @Fom 的声明当成一个线程池来使用,同样支持一些接口功能,可用来提交执行任务
FomExecutor.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Fom (threadCore = 4 , taskOverTime = 5000 , enableTaskConflict = true , detectTimeoutOnEachTask = true )public class FomExecutor implements ResultHandler <Long >, TaskCancelHandler , CompleteHandler <Long > { private static final Logger LOG = LoggerFactory.getLogger(FomExecutor.class ) ; @Override public void handleResult (FomTaskResult<Long> result) throws Exception { LOG.info("handleResult:统计任务[{}]的结果:{}" , result.getTaskId(), result.getContent()); } @Override public void handleCancel (String taskId, long costTime) throws Exception { LOG.info("handleCancel:取消任务[{}],耗时={}ms" , taskId, costTime); } @Override public void onComplete (long times, long lastTime, List<FomTaskResult<Long>> results) throws Exception { String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" ).format(lastTime); LOG.info("onComplete: 第{}次在{}提交的任务全部执行结束,结果数:{}" , times, date, results.size()); } }
使用时实际注入类型ScheduleContext
ExecController.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @RestController @RequestMapping ("/exec" )public class ExecController { @Autowired private ScheduleContext<Long> $fomExecutor; @RequestMapping ("/submit" ) public void submit (String tag) { List<Task<Long>> tasks = new ArrayList<>(); for (int i = 0 ; i < 10 ; i++){ tasks.add(new OtherTask(tag + i)); } $fomExecutor.submitBatch(tasks); } }
4. 任务管理 FomService中提供了一些接口支持任务的实时监控管理,如果接口功能不能满足,也可以直接获取ScheduleContext实例进行操作
ExecController.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @RestController @RequestMapping ("/exec" )public class ExecController { @Autowired private FomService fomService; @Autowired private List<ScheduleContext<?>> schedules; @RequestMapping ("/list" ) public List<ScheduleInfo> list () { return fomService.list(); } @RequestMapping ("/count" ) public Integer count () { return schedules.size(); } }
默认基于FomService提供了一个任务管理界面:http://[ip]:[port]/[context-path]/fom
对于失败和等待中的任务,也可以查看异常信息、创建时间等信息,这里不再赘述…
5. Metric指标 推荐使用Spring Boot Actuator,可以采集一些指标通过prometheus监控
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-actuator</artifactId > <version > 2.7.0</version > </dependency > <dependency > <groupId > io.micrometer</groupId > <artifactId > micrometer-registry-prometheus</artifactId > <version > 1.9.0</version > </dependency >
具体的指标内容示例:
Metric 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 fom_task_count_active{schedule="fomExecutor",} 0.0 # 活跃任务数 fom_task_count_waiting{schedule="fomExecutor",} 0.0 # 等待任务数 fom_schedule_count_times_total{schedule="$fomExecutor",} # 调度次数 fom_task_count_success_total{schedule="fomExecutor",task="xx9",} 1.0 # 任务成功数 fom_task_count_failed_total{schedule="fomExecutor",task="xx1",} 1.0 # 任务失败数 fom_task_cost_min{schedule="fomExecutor",task="xx9",} 3080.0 # 任务最小耗时 fom_task_cost_max{schedule="fomExecutor",task="xx9",} 3080.0 # 任务最大耗时 fom_task_cost_avg{schedule="fomExecutor",task="xx9",} 3080.0 # 任务平均耗时 # 任务耗时区间统计 fom_task_cost_summary_bucket{schedule="fomExecutor",task="xx9",le="250.0",} 0.0 fom_task_cost_summary_bucket{schedule="fomExecutor",task="xx9",le="500.0",} 0.0 fom_task_cost_summary_bucket{schedule="fomExecutor",task="xx9",le="1000.0",} 0.0 fom_task_cost_summary_bucket{schedule="fomExecutor",task="xx9",le="5000.0",} 1.0 fom_task_cost_summary_bucket{schedule="fomExecutor",task="xx9",le="10000.0",} 1.0 fom_task_cost_summary_bucket{schedule="fomExecutor",task="xx9",le="+Inf",} 1.0
参考: