648540858 1 рік тому
батько
коміт
5f09692198

+ 86 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/session/CommonSessionManager.java

@@ -0,0 +1,86 @@
+package com.genersoft.iot.vmp.gb28181.session;
+
+import com.genersoft.iot.vmp.common.CommonCallback;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.Calendar;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 通用回调管理
+ */
+@Component
+public class CommonSessionManager {
+
+    public static Map<String, CommonSession> callbackMap = new ConcurrentHashMap<>();
+
+    /**
+     * 存储回调相关的信息
+     */
+    class CommonSession{
+        public String session;
+        public long createTime;
+        public int timeout;
+
+        public CommonCallback<Object> callback;
+        public CommonCallback<String> timeoutCallback;
+    }
+
+    /**
+     * 添加回调
+     * @param sessionId 唯一标识
+     * @param callback 回调
+     * @param timeout 超时时间, 单位分钟
+     */
+    public void add(String sessionId, CommonCallback<Object> callback, CommonCallback<String> timeoutCallback,
+                    Integer timeout) {
+        CommonSession commonSession = new CommonSession();
+        commonSession.session = sessionId;
+        commonSession.callback = callback;
+        commonSession.createTime = System.currentTimeMillis();
+        if (timeoutCallback != null) {
+            commonSession.timeoutCallback = timeoutCallback;
+        }
+        if (timeout != null) {
+            commonSession.timeout = timeout;
+        }
+        callbackMap.put(sessionId, commonSession);
+    }
+
+    public void add(String sessionId, CommonCallback<Object> callback) {
+        add(sessionId, callback, null, 1);
+    }
+
+    public CommonCallback<Object> get(String sessionId, boolean destroy) {
+        CommonSession commonSession = callbackMap.get(sessionId);
+        if (destroy) {
+            callbackMap.remove(sessionId);
+        }
+        return commonSession.callback;
+    }
+
+    public CommonCallback<Object> get(String sessionId) {
+        return get(sessionId, false);
+    }
+
+    public void delete(String sessionID) {
+        callbackMap.remove(sessionID);
+    }
+
+    @Scheduled(fixedRate= 60)   //每分钟执行一次
+    public void execute(){
+        Calendar cal = Calendar.getInstance();
+        cal.add(Calendar.MINUTE, -1);
+        for (String session : callbackMap.keySet()) {
+            if (callbackMap.get(session).createTime < cal.getTimeInMillis()) {
+                // 超时
+                if (callbackMap.get(session).timeoutCallback != null) {
+                    callbackMap.get(session).timeoutCallback.run("timeout");
+                }
+                callbackMap.remove(session);
+            }
+        }
+    }
+}

+ 2 - 3
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java

@@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
-import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
@@ -38,7 +37,6 @@ import javax.sip.SipException;
 import javax.sip.header.FromHeader;
 import javax.sip.message.Response;
 import java.text.ParseException;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -222,7 +220,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 					mobilePosition.getLongitude(), mobilePosition.getLatitude());
 			mobilePosition.setReportSource("Mobile Position");
 
-
 			// 更新device channel 的经纬度
 			DeviceChannel deviceChannel = new DeviceChannel();
 			deviceChannel.setDeviceId(device.getDeviceId());
@@ -242,6 +239,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 			}
 
 			storager.updateChannelPosition(deviceChannel);
+			// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
+
 
 			// 发送redis消息。 通知位置信息的变化
 			JSONObject jsonObject = new JSONObject();