浏览代码

优化级联移动位置订阅位置更新

648540858 3 年之前
父节点
当前提交
efc4a7bc8e

+ 4 - 1
src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java

@@ -39,7 +39,7 @@ public class DynamicTask {
     public void startCron(String key, Runnable task, int cycleForCatalog) {
         stop(key);
         // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
-        ScheduledFuture future = threadPoolTaskScheduler.scheduleWithFixedDelay(task, cycleForCatalog * 1000L);
+        ScheduledFuture future = threadPoolTaskScheduler.scheduleAtFixedRate(task, cycleForCatalog * 1000L);
         futureMap.put(key, future);
         runnableMap.put(key, task);
     }
@@ -78,4 +78,7 @@ public class DynamicTask {
         return futureMap.keySet();
     }
 
+    public Runnable get(String key) {
+        return runnableMap.get(key);
+    }
 }

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java

@@ -99,7 +99,10 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
 		storager.updateDevice(device);
 		// 上线添加订阅
 		if (device.getSubscribeCycleForCatalog() > 0) {
+			// 查询在线设备那些开启了订阅,为设备开启定时的目录订阅
 			deviceService.addCatalogSubscribe(device);
+		}
+		if (device.getSubscribeCycleForMobilePosition() > 0) {
 			deviceService.addMobilePositionSubscribe(device);
 		}
 	}

+ 4 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java

@@ -1,5 +1,9 @@
 package com.genersoft.iot.vmp.gb28181.task;
 
+import javax.sip.DialogState;
+
 public interface ISubscribeTask extends Runnable{
     void stop();
+
+    DialogState getDialogState();
 }

+ 8 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java

@@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Async;
 
 import javax.sip.Dialog;
 import javax.sip.DialogState;
@@ -45,6 +46,7 @@ public class CatalogSubscribeTask implements ISubscribeTask {
         });
     }
 
+    @Async
     @Override
     public void stop() {
         /**
@@ -72,4 +74,10 @@ public class CatalogSubscribeTask implements ISubscribeTask {
             });
         }
     }
+
+    @Override
+    public DialogState getDialogState() {
+        if (dialog == null) return null;
+        return dialog.getState();
+    }
 }

+ 38 - 27
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java

@@ -1,16 +1,16 @@
 package com.genersoft.iot.vmp.gb28181.task.impl;
 
-import com.genersoft.iot.vmp.gb28181.bean.GbStream;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
-import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
-import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
+import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Async;
 
-import java.text.SimpleDateFormat;
+import javax.sip.DialogState;
 import java.util.List;
 
 /**
@@ -18,6 +18,8 @@ import java.util.List;
  */
 public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
 
+    private Logger logger = LoggerFactory.getLogger(MobilePositionSubscribeHandlerTask.class);
+
     private IRedisCatchStorage redisCatchStorage;
     private IVideoManagerStorage storager;
     private ISIPCommanderForPlatform sipCommanderForPlatform;
@@ -26,8 +28,6 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
     private String sn;
     private String key;
 
-    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
     public MobilePositionSubscribeHandlerTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorage storager, String platformId, String sn, String key, SubscribeHolder subscribeInfo) {
         this.redisCatchStorage = redisCatchStorage;
         this.storager = storager;
@@ -38,40 +38,51 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
         this.subscribeHolder = subscribeInfo;
     }
 
+    @Async
     @Override
     public void run() {
 
+        logger.info("执行MobilePositionSubscribeHandlerTask");
         SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platformId);
-
         if (subscribe != null) {
             ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
-            if (parentPlatform == null || parentPlatform.isStatus()) {
-                // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
-                List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platformId);
-                if (gbStreams.size() > 0) {
-                    for (GbStream gbStream : gbStreams) {
-                        String gbId = gbStream.getGbId();
-                        GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
-                        if (gpsMsgInfo != null) {
-                            // 发送GPS消息
-                            sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
-                        }else {
-                            // 没有在redis找到新的消息就使用数据库的消息
-                            gpsMsgInfo = new GPSMsgInfo();
-                            gpsMsgInfo.setId(gbId);
-                            gpsMsgInfo.setLat(gbStream.getLongitude());
-                            gpsMsgInfo.setLng(gbStream.getLongitude());
-                            // 发送GPS消息
-                            sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
-                        }
+            if (parentPlatform == null ) {
+                logger.info("发送订阅时未找到平台信息:{}", platformId);
+                return;
+            }
+            if (!parentPlatform.isStatus()) {
+                logger.info("发送订阅时发现平台已经离线:{}", platformId);
+                return;
+            }
+            // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
+            List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platformId);
+            if (gbStreams.size() == 0) {
+                logger.info("发送订阅时发现平台已经没有关联的直播流:{}", platformId);
+                return;
+            }
+            for (GbStream gbStream : gbStreams) {
+                String gbId = gbStream.getGbId();
+                GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
+                if (gpsMsgInfo != null) { // 无最新位置不发送
+                    // 经纬度都为0不发送
+                    if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) {
+                        continue;
                     }
+                    // 发送GPS消息
+                    sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
                 }
             }
         }
+        logger.info("结束执行MobilePositionSubscribeHandlerTask");
     }
 
     @Override
     public void stop() {
 
     }
+
+    @Override
+    public DialogState getDialogState() {
+        return null;
+    }
 }

+ 7 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java

@@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import org.dom4j.Element;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Async;
 
 import javax.sip.Dialog;
 import javax.sip.DialogState;
@@ -25,6 +26,7 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
         this.sipCommander = sipCommander;
     }
 
+    @Async
     @Override
     public void run() {
         sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> {
@@ -74,4 +76,9 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
             });
         }
     }
+    @Override
+    public DialogState getDialogState() {
+        if (dialog == null) return null;
+        return dialog.getState();
+    }
 }

+ 19 - 8
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

@@ -1566,17 +1566,28 @@ public class SIPCommander implements ISIPCommander {
 			cmdXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
 			cmdXml.append("</Query>\r\n");
 
-			String tm = Long.toString(System.currentTimeMillis());
 
-			CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
-					: udpSipProvider.getNewCallId();
+			Request request;
+			if (dialog != null) {
+				logger.info("发送目录订阅消息时 dialog的状态为: {}", dialog.getState());
+				request = dialog.createRequest(Request.SUBSCRIBE);
+				ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
+				request.setContent(cmdXml.toString(), contentTypeHeader);
+				ExpiresHeader expireHeader = sipFactory.createHeaderFactory().createExpiresHeader(device.getSubscribeCycleForMobilePosition());
+				request.addHeader(expireHeader);
+			}else {
+				String tm = Long.toString(System.currentTimeMillis());
 
-			// 有效时间默认为60秒以上
-			Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm,
-					"fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" ,
-					callIdHeader);
-			transmitRequest(device, request, errorEvent, okEvent);
+				CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
+						: udpSipProvider.getNewCallId();
+
+				// 有效时间默认为60秒以上
+				request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm,
+						"fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" ,
+						callIdHeader);
 
+			}
+			transmitRequest(device, request, errorEvent, okEvent);
 			return true;
 
 		} catch ( NumberFormatException | ParseException | InvalidArgumentException	| SipException e) {

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java

@@ -405,7 +405,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
             CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                     : udpSipProvider.getNewCallId();
             callIdHeader.setCallId(subscribeInfo.getCallId());
-            logger.info("[发送Notify-MobilePosition] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat());
+            logger.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat());
             sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> {
                 logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
             }, null);

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java

@@ -146,7 +146,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 			} else {
 				mobilePosition.setAltitude(0.0);
 			}
-			logger.info("[收到Notify-MobilePosition]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
+			logger.info("[收到 移动位置订阅]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
 					mobilePosition.getLongitude(), mobilePosition.getLatitude());
 			mobilePosition.setReportSource("Mobile Position");
 			BaiduPoint bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude()));
@@ -281,7 +281,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 					Element eventElement = itemDevice.element("Event");
 					DeviceChannel channel = XmlUtil.channelContentHander(itemDevice);
 					channel.setDeviceId(device.getDeviceId());
-					logger.info("[收到Notify-Catalog]:{}/{}", device.getDeviceId(), channel.getChannelId());
+					logger.info("[收到 目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
 					switch (eventElement.getText().toUpperCase()) {
 						case CatalogEvent.ON: // 上线
 							logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId());

+ 16 - 7
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java

@@ -150,7 +150,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
 		}
 		String sn = XmlUtil.getText(rootElement, "SN");
 		String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() +  "_MobilePosition_" + platformId;
-		logger.info("[notify-MobilePosition]: {}", platformId);
+		logger.info("[回复 移动位置订阅]: {}", platformId);
 		StringBuilder resultXml = new StringBuilder(200);
 		resultXml.append("<?xml version=\"1.0\" ?>\r\n")
 				.append("<Response>\r\n")
@@ -161,12 +161,21 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
 				.append("</Response>\r\n");
 
 		if (subscribeInfo.getExpires() > 0) {
-			if (subscribeHolder.getMobilePositionSubscribe(platformId) != null) {
-				dynamicTask.stop(key);
+
+			if (subscribeHolder.getMobilePositionSubscribe(platformId) == null ) {
+				String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
+				subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
+				dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager,  platformId, sn, key, subscribeHolder), Integer.parseInt(interval));
+			}else {
+				if (subscribeHolder.getMobilePositionSubscribe(platformId).getDialog() != null
+						&& subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState() != null
+						&& !subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState().equals(DialogState.CONFIRMED)) {
+					dynamicTask.stop(key);
+					String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
+					subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
+					dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager,  platformId, sn, key, subscribeHolder), Integer.parseInt(interval));
+				}
 			}
-			String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
-			dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager,  platformId, sn, key, subscribeHolder), Integer.parseInt(interval) -1 );
-			subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
 		}else if (subscribeInfo.getExpires() == 0) {
 			dynamicTask.stop(key);
 			subscribeHolder.removeMobilePositionSubscribe(platformId);
@@ -203,7 +212,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
 		}
 		String sn = XmlUtil.getText(rootElement, "SN");
 		String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() +  "_Catalog_" + platformId;
-		logger.info("[notify-Catalog]: {}", platformId);
+		logger.info("[回复 目录订阅]: {}/{}", platformId, deviceID);
 		StringBuilder resultXml = new StringBuilder(200);
 		resultXml.append("<?xml version=\"1.0\" ?>\r\n")
 				.append("<Response>\r\n")

+ 35 - 3
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java

@@ -4,8 +4,13 @@ import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
+import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
 import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
 import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
+import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
+import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
+import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask;
+import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
@@ -29,9 +34,8 @@ import org.springframework.util.StringUtils;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
+import javax.sip.DialogState;
+import java.util.*;
 
 @Api(tags = "国标设备查询", value = "国标设备查询")
 @SuppressWarnings("rawtypes")
@@ -63,6 +67,9 @@ public class DeviceQuery {
 	@Autowired
 	private DynamicTask dynamicTask;
 
+	@Autowired
+	private SubscribeHolder subscribeHolder;
+
 	/**
 	 * 使用ID查询国标设备
 	 * @param deviceId 国标ID
@@ -469,4 +476,29 @@ public class DeviceQuery {
 		}
 		return wvpResult;
 	}
+
+	@GetMapping("/{deviceId}/subscribe_info")
+	@ApiOperation(value = "获取设备的订阅状态", notes = "获取设备的订阅状态")
+	public WVPResult<Map<String, String>> getSubscribeInfo(@PathVariable String deviceId) {
+		Set<String> allKeys = dynamicTask.getAllKeys();
+		Map<String, String> dialogStateMap = new HashMap<>();
+		for (String key : allKeys) {
+			if (key.startsWith(deviceId)) {
+				ISubscribeTask subscribeTask = (ISubscribeTask)dynamicTask.get(key);
+				DialogState dialogState = subscribeTask.getDialogState();
+				if (dialogState == null) {
+					continue;
+				}
+				if (subscribeTask instanceof CatalogSubscribeTask) {
+					dialogStateMap.put("catalog", dialogState.toString());
+				}else if (subscribeTask instanceof MobilePositionSubscribeTask) {
+					dialogStateMap.put("mobilePosition", dialogState.toString());
+				}
+			}
+		}
+		WVPResult<Map<String, String>> wvpResult = new WVPResult<>();
+		wvpResult.setCode(0);
+		wvpResult.setData(dialogStateMap);
+		return wvpResult;
+	}
 }