DynamicTask.java 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package com.genersoft.iot.vmp.conf;
  2. import org.apache.commons.lang3.ObjectUtils;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.scheduling.annotation.Scheduled;
  6. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
  7. import org.springframework.stereotype.Component;
  8. import javax.annotation.PostConstruct;
  9. import java.time.Instant;
  10. import java.util.Map;
  11. import java.util.Set;
  12. import java.util.concurrent.ConcurrentHashMap;
  13. import java.util.concurrent.ScheduledFuture;
  14. import java.util.concurrent.TimeUnit;
  15. /**
  16. * 动态定时任务
  17. * @author lin
  18. */
  19. @Component
  20. public class DynamicTask {
  21. private final Logger logger = LoggerFactory.getLogger(DynamicTask.class);
  22. private ThreadPoolTaskScheduler threadPoolTaskScheduler;
  23. private final Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>();
  24. private final Map<String, Runnable> runnableMap = new ConcurrentHashMap<>();
  25. @PostConstruct
  26. public void DynamicTask() {
  27. threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
  28. threadPoolTaskScheduler.setPoolSize(300);
  29. threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
  30. threadPoolTaskScheduler.setAwaitTerminationSeconds(10);
  31. threadPoolTaskScheduler.initialize();
  32. }
  33. /**
  34. * 循环执行的任务
  35. * @param key 任务ID
  36. * @param task 任务
  37. * @param cycleForCatalog 间隔 毫秒
  38. * @return
  39. */
  40. public void startCron(String key, Runnable task, int cycleForCatalog) {
  41. ScheduledFuture<?> future = futureMap.get(key);
  42. if (future != null) {
  43. if (future.isCancelled()) {
  44. logger.debug("任务【{}】已存在但是关闭状态!!!", key);
  45. } else {
  46. logger.debug("任务【{}】已存在且已启动!!!", key);
  47. return;
  48. }
  49. }
  50. // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
  51. future = threadPoolTaskScheduler.scheduleAtFixedRate(task, cycleForCatalog);
  52. if (future != null){
  53. futureMap.put(key, future);
  54. runnableMap.put(key, task);
  55. logger.debug("任务【{}】启动成功!!!", key);
  56. }else {
  57. logger.debug("任务【{}】启动失败!!!", key);
  58. }
  59. }
  60. /**
  61. * 延时任务
  62. * @param key 任务ID
  63. * @param task 任务
  64. * @param delay 延时 /毫秒
  65. * @return
  66. */
  67. public void startDelay(String key, Runnable task, int delay) {
  68. stop(key);
  69. // 获取执行的时刻
  70. Instant startInstant = Instant.now().plusMillis(TimeUnit.MILLISECONDS.toMillis(delay));
  71. ScheduledFuture future = futureMap.get(key);
  72. if (future != null) {
  73. if (future.isCancelled()) {
  74. logger.debug("任务【{}】已存在但是关闭状态!!!", key);
  75. } else {
  76. logger.debug("任务【{}】已存在且已启动!!!", key);
  77. return;
  78. }
  79. }
  80. // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
  81. future = threadPoolTaskScheduler.schedule(task, startInstant);
  82. if (future != null){
  83. futureMap.put(key, future);
  84. runnableMap.put(key, task);
  85. logger.debug("任务【{}】启动成功!!!", key);
  86. }else {
  87. logger.debug("任务【{}】启动失败!!!", key);
  88. }
  89. }
  90. public boolean stop(String key) {
  91. boolean result = false;
  92. if (!ObjectUtils.isEmpty(futureMap.get(key)) && !futureMap.get(key).isCancelled() && !futureMap.get(key).isDone()) {
  93. result = futureMap.get(key).cancel(false);
  94. futureMap.remove(key);
  95. runnableMap.remove(key);
  96. }
  97. return result;
  98. }
  99. public boolean contains(String key) {
  100. return futureMap.get(key) != null;
  101. }
  102. public Set<String> getAllKeys() {
  103. return futureMap.keySet();
  104. }
  105. public Runnable get(String key) {
  106. return runnableMap.get(key);
  107. }
  108. /**
  109. * 每五分钟检查失效的任务,并移除
  110. */
  111. @Scheduled(cron="0 0/5 * * * ?")
  112. public void execute(){
  113. if (futureMap.size() > 0) {
  114. for (String key : futureMap.keySet()) {
  115. if (futureMap.get(key).isDone() || futureMap.get(key).isCancelled()) {
  116. futureMap.remove(key);
  117. runnableMap.remove(key);
  118. }
  119. }
  120. }
  121. }
  122. public boolean isAlive(String key) {
  123. return futureMap.get(key) != null && !futureMap.get(key).isDone() && !futureMap.get(key).isCancelled();
  124. }
  125. }