Browse Source

优化报警推送心跳定时任务

648540858 1 year ago
parent
commit
ace7b78498

+ 0 - 14
src/main/java/com/genersoft/iot/vmp/gb28181/controller/SseController.java

@@ -1,6 +1,5 @@
 package com.genersoft.iot.vmp.gb28181.controller;
 
-import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEventListener;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import org.springframework.web.bind.annotation.GetMapping;
@@ -29,9 +28,6 @@ public class SseController {
     @Resource
     private AlarmEventListener alarmEventListener;
 
-    @Resource
-    private DynamicTask dynamicTask;
-
     /**
      * SSE 推送.
      *
@@ -45,17 +41,7 @@ public class SseController {
     public void emit(HttpServletResponse response, @RequestParam String browserId) throws IOException, InterruptedException {
         response.setContentType("text/event-stream");
         response.setCharacterEncoding("utf-8");
-
         PrintWriter writer = response.getWriter();
         alarmEventListener.addSseEmitter(browserId, writer);
-
-        dynamicTask.startCron("sse-key", new Runnable() {
-            @Override
-            public void run() {
-                writer.write(":keep alive\n\n");
-                writer.flush();
-            }
-        }, 1000);
-        alarmEventListener.removeSseEmitter(browserId, writer);
     }
 }

+ 18 - 19
src/main/java/com/genersoft/iot/vmp/gb28181/event/alarm/AlarmEventListener.java

@@ -22,16 +22,27 @@ import java.util.concurrent.ConcurrentHashMap;
 @Component
 public class AlarmEventListener implements ApplicationListener<AlarmEvent> {
 
-    private static final Map<String, PrintWriter> SSE_CACHE = new ConcurrentHashMap<>();
+    private static final Map<String, PrintWriter> sseChannelMap = new ConcurrentHashMap<>();
+
+    public void addSseEmitter(String browserId, PrintWriter writer) throws InterruptedException {
+        sseChannelMap.put(browserId, writer);
+        log.info("[SSE推送] 连接已建立, 浏览器 ID: {}, 当前在线数: {}", browserId, sseChannelMap.size());
+        while (!writer.checkError()) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            writer.write(":keep alive\n\n");
+            writer.flush();
+        }
+        removeSseEmitter(browserId, writer);
 
-    public void addSseEmitter(String browserId, PrintWriter writer) {
-        SSE_CACHE.put(browserId, writer);
-        log.info("[SSE推送] 连接已建立, 浏览器 ID: {}, 当前在线数: {}", browserId, SSE_CACHE.size());
     }
 
     public void removeSseEmitter(String browserId, PrintWriter writer) {
-        SSE_CACHE.remove(browserId, writer);
-        log.info("[SSE推送] 连接已断开, 浏览器 ID: {}, 当前在线数: {}", browserId, SSE_CACHE.size());
+        sseChannelMap.remove(browserId, writer);
+        log.info("[SSE推送] 连接已断开, 浏览器 ID: {}, 当前在线数: {}", browserId, sseChannelMap.size());
     }
 
     @Override
@@ -42,13 +53,7 @@ public class AlarmEventListener implements ApplicationListener<AlarmEvent> {
 
         log.info("设备报警事件触发, deviceId: {}, {}", event.getAlarmInfo().getDeviceId(), event.getAlarmInfo().getAlarmDescription());
 
-
-        String msg = "<strong>设备:</strong> <i>" + event.getAlarmInfo().getDeviceId() + "</i>"
-                + "<br><strong>通道编号:</strong> <i>" + event.getAlarmInfo().getChannelId() + "</i>"
-                + "<br><strong>报警描述:</strong> <i>" + event.getAlarmInfo().getAlarmDescription() + "</i>"
-                + "<br><strong>报警时间:</strong> <i>" + event.getAlarmInfo().getAlarmTime() + "</i>";
-
-        for (Iterator<Map.Entry<String, PrintWriter>> it = SSE_CACHE.entrySet().iterator(); it.hasNext(); ) {
+        for (Iterator<Map.Entry<String, PrintWriter>> it = sseChannelMap.entrySet().iterator(); it.hasNext(); ) {
             Map.Entry<String, PrintWriter> response = it.next();
 
             try {
@@ -59,12 +64,6 @@ public class AlarmEventListener implements ApplicationListener<AlarmEvent> {
                     continue;
                 }
 
-                String sseMsg = "event:message\n" +
-                        "data:" + msg + "\n" +
-                        "\n";
-                System.out.println(
-                        SSEMessage.getInstance("message", event.getAlarmInfo()).ecode()
-                );
                 writer.write(SSEMessage.getInstance("message", event.getAlarmInfo()).ecode());
                 writer.flush();
             } catch (Exception e) {