浏览代码

Merge branch 'refs/heads/2.7.0' into 271-优化notify存储

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
648540858 1 年之前
父节点
当前提交
588b1da35a
共有 16 个文件被更改,包括 479 次插入559 次删除
  1. 1 1
      src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
  2. 2 2
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java
  3. 3 1
      src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
  4. 2 4
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
  5. 1 1
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
  6. 176 189
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
  7. 150 142
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
  8. 16 189
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
  9. 1 1
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
  10. 5 0
      src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java
  11. 65 10
      src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
  12. 2 1
      src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
  13. 14 6
      src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
  14. 17 0
      src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
  15. 22 9
      src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
  16. 2 3
      src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java

@@ -66,7 +66,7 @@ public class UserSetting {
 
     private List<String> allowedOrigins = new ArrayList<>();
 
-    private int maxNotifyCountQueue = 10000;
+    private int maxNotifyCountQueue = 100000;
 
     private int registerAgainAfterTime = 60;
 

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java

@@ -535,11 +535,11 @@ public class DeviceChannel {
 		this.subCount = subCount;
 	}
 
-	public boolean isHasAudio() {
+	public Boolean getHasAudio() {
 		return hasAudio;
 	}
 
-	public void setHasAudio(boolean hasAudio) {
+	public void setHasAudio(Boolean hasAudio) {
 		this.hasAudio = hasAudio;
 	}
 

+ 3 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java

@@ -49,6 +49,7 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
         ParentPlatform parentPlatform = null;
 
         Map<String, List<ParentPlatform>> parentPlatformMap = new HashMap<>();
+        Map<String, DeviceChannel> channelMap = new HashMap<>();
         if (!ObjectUtils.isEmpty(event.getPlatformId())) {
             subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
             if (subscribe == null) {
@@ -67,6 +68,7 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
                     for (DeviceChannel deviceChannel : event.getDeviceChannels()) {
                         List<ParentPlatform> parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(deviceChannel.getChannelId(), platforms);
                         parentPlatformMap.put(deviceChannel.getChannelId(), parentPlatformsForGB);
+                        channelMap.put(deviceChannel.getChannelId(), deviceChannel);
                     }
                 }
             }else if (event.getGbStreams() != null) {
@@ -174,7 +176,7 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
                                 }
                                 logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);
                                 List<DeviceChannel> deviceChannelList = new ArrayList<>();
-                                DeviceChannel deviceChannel = storager.queryChannelInParentPlatform(platform.getServerGBId(), gbId);
+                                DeviceChannel deviceChannel = channelMap.get(gbId);
                                 deviceChannelList.add(deviceChannel);
                                 GbStream gbStream = storager.queryStreamInParentPlatform(platform.getServerGBId(), gbId);
                                 if(gbStream != null){

+ 2 - 4
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java

@@ -520,9 +520,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
         if (parentPlatform == null) {
             return;
         }
-        if (logger.isDebugEnabled()) {
-            logger.debug("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat());
-        }
+        logger.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat());
 
         String characterSet = parentPlatform.getCharacterSet();
         StringBuffer deviceStatusXml = new StringBuffer(600);
@@ -597,6 +595,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
         Integer finalIndex = index;
         String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channels,
                 deviceChannels.size(), type, subscribeInfo);
+        System.out.println(catalogXmlContent);
         logger.info("[发送NOTIFY通知]类型: {},发送数量: {}", type, channels.size());
         sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
             logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
@@ -626,7 +625,6 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
 
     private  String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, List<DeviceChannel> channels, int sumNum, String type, SubscribeInfo subscribeInfo) {
         StringBuffer catalogXml = new StringBuffer(600);
-
         String characterSet = parentPlatform.getCharacterSet();
         catalogXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet + "\"?>\r\n")
                 .append("<Notify>\r\n")

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java

@@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import com.google.common.primitives.Bytes;
 import gov.nist.javax.sip.message.SIPRequest;
 import gov.nist.javax.sip.message.SIPResponse;
-import org.apache.commons.lang3.ArrayUtils;
 import org.dom4j.Document;
 import org.dom4j.DocumentException;
 import org.dom4j.Element;
@@ -172,6 +171,7 @@ public abstract class SIPRequestProcessorParent {
 		return getRootElement(evt, "gb2312");
 	}
 	public Element getRootElement(RequestEvent evt, String charset) throws DocumentException {
+
 		if (charset == null) {
 			charset = "gb2312";
 		}

+ 176 - 189
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java

@@ -1,11 +1,11 @@
 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 
-import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
+import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@@ -19,7 +19,9 @@ import org.dom4j.Element;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
 
 import javax.sip.RequestEvent;
 import javax.sip.header.FromHeader;
@@ -28,6 +30,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
@@ -46,6 +49,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 	private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>();
 	private final List<DeviceChannel> deleteChannelList = new CopyOnWriteArrayList<>();
 
+	private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
 
 	@Autowired
 	private UserSetting userSetting;
@@ -62,235 +66,215 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 	@Autowired
 	private DynamicTask dynamicTask;
 
-	@Autowired
-	private CivilCodeFileConf civilCodeFileConf;
-
 	@Autowired
 	private SipConfig sipConfig;
 
-	private final static String talkKey = "notify-request-for-catalog-task";
-
+	@Transactional
 	public void process(RequestEvent evt) {
-		try {
-			long start = System.currentTimeMillis();
-			FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
-			String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
+		if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
+			logger.error("[notify-目录订阅] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
+			return;
+		}
+		taskQueue.offer(new HandlerCatchData(evt, null, null));
+	}
 
-			Device device = redisCatchStorage.getDevice(deviceId);
-			if (device == null || !device.isOnLine()) {
-				logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" ));
-				return;
-			}
-			Element rootElement = getRootElement(evt, device.getCharset());
-			if (rootElement == null) {
-				logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest());
-				return;
-			}
-			Element deviceListElement = rootElement.element("DeviceList");
-			if (deviceListElement == null) {
-				return;
+	@Scheduled(fixedRate = 400)   //每400毫秒执行一次
+	public void executeTaskQueue(){
+		if (taskQueue.isEmpty()) {
+			return;
+		}
+		for (HandlerCatchData take : taskQueue) {
+			if (take == null) {
+				continue;
 			}
-			Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
-			if (deviceListIterator != null) {
-
-				// 遍历DeviceList
-				while (deviceListIterator.hasNext()) {
-					Element itemDevice = deviceListIterator.next();
-					Element channelDeviceElement = itemDevice.element("DeviceID");
-					if (channelDeviceElement == null) {
-						continue;
-					}
-					Element eventElement = itemDevice.element("Event");
-					String event;
-					if (eventElement == null) {
-						logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" ));
-						event = CatalogEvent.ADD;
-					}else {
-						event = eventElement.getText().toUpperCase();
-					}
-					DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
-					if (channel == null) {
-						logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
-						continue;
-					}
-					if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
-						channel.setParentId(null);
-					}
-					channel.setDeviceId(device.getDeviceId());
-					logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
-					switch (event) {
-						case CatalogEvent.ON:
-							// 上线
-							logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							updateChannelOnlineList.add(channel);
-							if (updateChannelOnlineList.size() > 300) {
-								executeSaveForOnline();
-							}
-							if (userSetting.getDeviceStatusNotify()) {
-								// 发送redis消息
-								redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
-							}
-
-							break;
-						case CatalogEvent.OFF :
-							// 离线
-							logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
-								logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							}else {
-								updateChannelOfflineList.add(channel);
-								if (updateChannelOfflineList.size() > 300) {
-									executeSaveForOffline();
-								}
-								if (userSetting.getDeviceStatusNotify()) {
-									// 发送redis消息
-									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
-								}
-							}
-							break;
-						case CatalogEvent.VLOST:
-							// 视频丢失
-							logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
-								logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							}else {
-								updateChannelOfflineList.add(channel);
-								if (updateChannelOfflineList.size() > 300) {
-									executeSaveForOffline();
-								}
+			RequestEvent evt = take.getEvt();
+			try {
+				long start = System.currentTimeMillis();
+				FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
+				String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
+
+				Device device = redisCatchStorage.getDevice(deviceId);
+				if (device == null || !device.isOnLine()) {
+					logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId() : ""));
+					return;
+				}
+				Element rootElement = getRootElement(evt, device.getCharset());
+				if (rootElement == null) {
+					logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest());
+					return;
+				}
+				Element deviceListElement = rootElement.element("DeviceList");
+				if (deviceListElement == null) {
+					return;
+				}
+				Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
+				if (deviceListIterator != null) {
+
+					// 遍历DeviceList
+					while (deviceListIterator.hasNext()) {
+						Element itemDevice = deviceListIterator.next();
+						Element eventElement = itemDevice.element("Event");
+						String event;
+						if (eventElement == null) {
+							logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId() : ""));
+							event = CatalogEvent.ADD;
+						} else {
+							event = eventElement.getText().toUpperCase();
+						}
+						DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
+						if (channel == null) {
+							logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
+							continue;
+						}
+						if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
+							channel.setParentId(null);
+						}
+						channel.setDeviceId(device.getDeviceId());
+						logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
+						switch (event) {
+							case CatalogEvent.ON:
+								// 上线
+								logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
+								updateChannelOnlineList.add(channel);
 								if (userSetting.getDeviceStatusNotify()) {
 									// 发送redis消息
-									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
+									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
 								}
-							}
-							break;
-						case CatalogEvent.DEFECT:
-							// 故障
-							logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
-								logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							}else {
-								updateChannelOfflineList.add(channel);
-								if (updateChannelOfflineList.size() > 300) {
-									executeSaveForOffline();
-								}
-								if (userSetting.getDeviceStatusNotify()) {
-									// 发送redis消息
-									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
+								break;
+							case CatalogEvent.OFF:
+								// 离线
+								logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
+								if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
+									logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
+								} else {
+									updateChannelOfflineList.add(channel);
+									if (userSetting.getDeviceStatusNotify()) {
+										// 发送redis消息
+										redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
+									}
 								}
-							}
-							break;
-						case CatalogEvent.ADD:
-							// 增加
-							logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							// 判断此通道是否存在
-							DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
-							if (deviceChannel != null) {
-								logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-								channel.setId(deviceChannel.getId());
-								updateChannelMap.put(channel.getChannelId(), channel);
-								if (updateChannelMap.keySet().size() > 300) {
-									executeSaveForUpdate();
+								break;
+							case CatalogEvent.VLOST:
+								// 视频丢失
+								logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
+								if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
+									logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
+								} else {
+									updateChannelOfflineList.add(channel);
+									if (userSetting.getDeviceStatusNotify()) {
+										// 发送redis消息
+										redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
+									}
 								}
-							}else {
-								addChannelMap.put(channel.getChannelId(), channel);
-								if (userSetting.getDeviceStatusNotify()) {
-									// 发送redis消息
-									redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
+								break;
+							case CatalogEvent.DEFECT:
+								// 故障
+								logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
+								if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
+									logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
+								} else {
+									updateChannelOfflineList.add(channel);
+									if (userSetting.getDeviceStatusNotify()) {
+										// 发送redis消息
+										redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
+									}
 								}
-
-								if (addChannelMap.keySet().size() > 300) {
-									executeSaveForAdd();
+								break;
+							case CatalogEvent.ADD:
+								// 增加
+								logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
+								// 判断此通道是否存在
+								DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
+								if (deviceChannel != null) {
+									logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
+									channel.setId(deviceChannel.getId());
+									channel.setHasAudio(null);
+									updateChannelMap.put(channel.getChannelId(), channel);
+								} else {
+									addChannelMap.put(channel.getChannelId(), channel);
+									if (userSetting.getDeviceStatusNotify()) {
+										// 发送redis消息
+										redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
+									}
 								}
-							}
 
-							break;
-						case CatalogEvent.DEL:
-							// 删除
-							logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							deleteChannelList.add(channel);
-							if (userSetting.getDeviceStatusNotify()) {
-								// 发送redis消息
-								redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
-							}
-							if (deleteChannelList.size() > 300) {
-								executeSaveForDelete();
-							}
-							break;
-						case CatalogEvent.UPDATE:
-							// 更新
-							logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							// 判断此通道是否存在
-							DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
-							if (deviceChannelForUpdate != null) {
-								channel.setId(deviceChannelForUpdate.getId());
-								channel.setUpdateTime(DateUtil.getNow());
-								updateChannelMap.put(channel.getChannelId(), channel);
-								if (updateChannelMap.keySet().size() > 300) {
-									executeSaveForUpdate();
-								}
-							}else {
-								addChannelMap.put(channel.getChannelId(), channel);
-								if (addChannelMap.keySet().size() > 300) {
-									executeSaveForAdd();
-								}
+								break;
+							case CatalogEvent.DEL:
+								// 删除
+								logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
+								deleteChannelList.add(channel);
 								if (userSetting.getDeviceStatusNotify()) {
 									// 发送redis消息
-									redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
+									redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
 								}
-							}
-							break;
-						default:
-							logger.warn("[ NotifyCatalog ] event not found : {}", event );
-
-					}
-					// 转发变化信息
-					eventPublisher.catalogEventPublish(null, channel, event);
-
-					if (!updateChannelMap.keySet().isEmpty()
-							|| !addChannelMap.keySet().isEmpty()
-							|| !updateChannelOnlineList.isEmpty()
-							|| !updateChannelOfflineList.isEmpty()
-							|| !deleteChannelList.isEmpty()) {
+								break;
+							case CatalogEvent.UPDATE:
+								// 更新
+								logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
+								// 判断此通道是否存在
+								DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
+								if (deviceChannelForUpdate != null) {
+									channel.setId(deviceChannelForUpdate.getId());
+									channel.setUpdateTime(DateUtil.getNow());
+									channel.setHasAudio(null);
+									updateChannelMap.put(channel.getChannelId(), channel);
+								} else {
+									addChannelMap.put(channel.getChannelId(), channel);
+									if (userSetting.getDeviceStatusNotify()) {
+										// 发送redis消息
+										redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
+									}
+								}
+								break;
+							default:
+								logger.warn("[ NotifyCatalog ] event not found : {}", event);
 
-						if (!dynamicTask.contains(talkKey)) {
-							dynamicTask.startDelay(talkKey, this::executeSave, 1000);
 						}
+						// 转发变化信息
+						eventPublisher.catalogEventPublish(null, channel, event);
 					}
 				}
+
+			} catch (DocumentException e) {
+				logger.error("未处理的异常 ", e);
 			}
-		} catch (DocumentException e) {
-			logger.error("未处理的异常 ", e);
+		}
+		taskQueue.clear();
+		if (!updateChannelMap.keySet().isEmpty()
+				|| !addChannelMap.keySet().isEmpty()
+				|| !updateChannelOnlineList.isEmpty()
+				|| !updateChannelOfflineList.isEmpty()
+				|| !deleteChannelList.isEmpty()) {
+			executeSave();
 		}
 	}
 
-	private void executeSave(){
+	public void executeSave(){
 		try {
 			executeSaveForAdd();
 		} catch (Exception e) {
 			logger.error("[存储收到的增加通道] 异常: ", e );
 		}
 		try {
-			executeSaveForUpdate();
+			executeSaveForOnline();
 		} catch (Exception e) {
-			logger.error("[存储收到的更新通道] 异常: ", e );
+			logger.error("[存储收到的通道上线] 异常: ", e );
 		}
 		try {
-			executeSaveForDelete();
+			executeSaveForOffline();
 		} catch (Exception e) {
-			logger.error("[存储收到的删除通道] 异常: ", e );
+			logger.error("[存储收到的通道离线] 异常: ", e );
 		}
 		try {
-			executeSaveForOnline();
+			executeSaveForUpdate();
 		} catch (Exception e) {
-			logger.error("[存储收到的通道上线] 异常: ", e );
+			logger.error("[存储收到的更新通道] 异常: ", e );
 		}
 		try {
-			executeSaveForOffline();
+			executeSaveForDelete();
 		} catch (Exception e) {
-			logger.error("[存储收到的通道离线] 异常: ", e );
+			logger.error("[存储收到的删除通道] 异常: ", e );
 		}
-		dynamicTask.stop(talkKey);
 	}
 
 	private void executeSaveForUpdate(){
@@ -300,7 +284,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 			deviceChannelService.batchUpdateChannel(deviceChannels);
 			updateChannelMap.clear();
 		}
-
 	}
 
 	private void executeSaveForAdd(){
@@ -332,4 +315,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 		}
 	}
 
+	@Scheduled(fixedRate = 10000)   //每1秒执行一次
+	public void execute(){
+		logger.info("[待处理Notify-目录订阅消息数量]: {}", taskQueue.size());
+	}
 }

+ 150 - 142
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java

@@ -1,17 +1,15 @@
 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 
-import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
-import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.conf.SipConfig;
+import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
+import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
 import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
 import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
-import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
 import com.genersoft.iot.vmp.service.IDeviceChannelService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -20,7 +18,9 @@ import org.dom4j.Element;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.ObjectUtils;
 
 import javax.sip.RequestEvent;
@@ -29,7 +29,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
  * SIP命令类型: NOTIFY请求中的移动位置请求处理
@@ -40,10 +40,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
 
     private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class);
 
-	private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
-
-	private final List<MobilePosition> addMobilePositionList = new CopyOnWriteArrayList();
-
+	private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
 
 	@Autowired
 	private UserSetting userSetting;
@@ -57,158 +54,169 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
 	@Autowired
 	private IDeviceChannelService deviceChannelService;
 
-	@Autowired
-	private DynamicTask dynamicTask;
-
-	@Autowired
-	private CivilCodeFileConf civilCodeFileConf;
-
-	@Autowired
-	private SipConfig sipConfig;
-
-	private final static String talkKey = "notify-request-for-mobile-position-task";
-
 	public void process(RequestEvent evt) {
-		try {
-			FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
-			String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
-
-			// 回复 200 OK
-			Element rootElement = getRootElement(evt);
-			if (rootElement == null) {
-				logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
-				return;
-			}
-
-			MobilePosition mobilePosition = new MobilePosition();
-			mobilePosition.setCreateTime(DateUtil.getNow());
 
-			Element deviceIdElement = rootElement.element("DeviceID");
-			String channelId = deviceIdElement.getTextTrim().toString();
-			Device device = redisCatchStorage.getDevice(deviceId);
+		if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
+			logger.error("[notify-移动位置] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
+			return;
+		}
+		taskQueue.offer(new HandlerCatchData(evt, null, null));
+	}
 
-			if (device == null) {
-				device = redisCatchStorage.getDevice(channelId);
-				if (device == null) {
-					// 根据通道id查询设备Id
-					List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
-					if (deviceList.size() > 0) {
-						device = deviceList.get(0);
-					}
-				}
-			}
-			if (device == null) {
-				logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
-				return;
+	@Scheduled(fixedRate = 200) //每200毫秒执行一次
+	@Transactional
+	public void executeTaskQueue() {
+		if (taskQueue.isEmpty()) {
+			return;
+		}
+		Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
+		List<MobilePosition> addMobilePositionList = new ArrayList<>();
+		for (HandlerCatchData take : taskQueue) {
+			if (take == null) {
+				continue;
 			}
-			// 兼容设备部分设备上报是通道编号与设备编号一致的情况
-			if (deviceId.equals(channelId)) {
-				List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
-				if (deviceChannels.size() == 1) {
-					channelId = deviceChannels.get(0).getChannelId();
+			RequestEvent evt = take.getEvt();
+			try {
+				FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
+				String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
+				long startTime = System.currentTimeMillis();
+				// 回复 200 OK
+				Element rootElement = getRootElement(evt);
+				if (rootElement == null) {
+					logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
+					return;
 				}
-			}
-			if (!ObjectUtils.isEmpty(device.getName())) {
+				Device device = redisCatchStorage.getDevice(deviceId);
+				if (device == null) {
+					logger.error("处理MobilePosition移动位置Notify时未获取到device,{}", deviceId);
+					return;
+				}
+				MobilePosition mobilePosition = new MobilePosition();
+				mobilePosition.setDeviceId(device.getDeviceId());
 				mobilePosition.setDeviceName(device.getName());
-			}
-			mobilePosition.setDeviceName(device.getName());
-			mobilePosition.setDeviceId(device.getDeviceId());
-			mobilePosition.setChannelId(channelId);
-			String time = XmlUtil.getText(rootElement, "Time");
-			if (ObjectUtils.isEmpty(time)) {
-				mobilePosition.setTime(DateUtil.getNow());
-			} else {
-				mobilePosition.setTime(SipUtils.parseTime(time));
-			}
+				mobilePosition.setCreateTime(DateUtil.getNow());
+				List<Element> elements = rootElement.elements();
+				for (Element element : elements) {
+					switch (element.getName()){
+						case "DeviceID":
+							String channelId = element.getStringValue();
+							if (!deviceId.equals(channelId)) {
+								mobilePosition.setChannelId(channelId);
+							}
+							continue;
+						case "Time":
+							String timeVal = element.getStringValue();
+							if (ObjectUtils.isEmpty(timeVal)) {
+								mobilePosition.setTime(DateUtil.getNow());
+							} else {
+								mobilePosition.setTime(SipUtils.parseTime(timeVal));
+							}
+							continue;
+						case "Longitude":
+							mobilePosition.setLongitude(Double.parseDouble(element.getStringValue()));
+							continue;
+						case "Latitude":
+							mobilePosition.setLatitude(Double.parseDouble(element.getStringValue()));
+							continue;
+						case "Speed":
+							String speedVal = element.getStringValue();
+							if (NumericUtil.isDouble(speedVal)) {
+								mobilePosition.setSpeed(Double.parseDouble(speedVal));
+							} else {
+								mobilePosition.setSpeed(0.0);
+							}
+							continue;
+						case "Direction":
+							String directionVal = element.getStringValue();
+							if (NumericUtil.isDouble(directionVal)) {
+								mobilePosition.setDirection(Double.parseDouble(directionVal));
+							} else {
+								mobilePosition.setDirection(0.0);
+							}
+							continue;
+						case "Altitude":
+							String altitudeVal = element.getStringValue();
+							if (NumericUtil.isDouble(altitudeVal)) {
+								mobilePosition.setAltitude(Double.parseDouble(altitudeVal));
+							} else {
+								mobilePosition.setAltitude(0.0);
+							}
+							continue;
 
-			mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
-			mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
-			if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) {
-				mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed")));
-			} else {
-				mobilePosition.setSpeed(0.0);
-			}
-			if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) {
-				mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction")));
-			} else {
-				mobilePosition.setDirection(0.0);
-			}
-			if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) {
-				mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude")));
-			} else {
-				mobilePosition.setAltitude(0.0);
-			}
-			logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
-					mobilePosition.getLongitude(), mobilePosition.getLatitude());
-			mobilePosition.setReportSource("Mobile Position");
-
-			// 更新device channel 的经纬度
-			DeviceChannel deviceChannel = new DeviceChannel();
-			deviceChannel.setDeviceId(device.getDeviceId());
-			deviceChannel.setChannelId(channelId);
-			deviceChannel.setLongitude(mobilePosition.getLongitude());
-			deviceChannel.setLatitude(mobilePosition.getLatitude());
-			deviceChannel.setGpsTime(mobilePosition.getTime());
-			updateChannelMap.put(channelId, deviceChannel);
-			addMobilePositionList.add(mobilePosition);
-			if(updateChannelMap.size() > 300) {
-				executeSaveChannel();
-			}
-			if (userSetting.isSavePositionHistory()) {
-				if(addMobilePositionList.size() > 300) {
-					executeSaveMobilePosition();
+					}
 				}
-			}
 
-//			deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
-//
-//			mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
-//			mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
-//			mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
-//			mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
-
-//			deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
-
-			if (!dynamicTask.contains(talkKey)) {
-				dynamicTask.startDelay(talkKey, this::executeSave, 1000);
+//			logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
+//					mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
+				mobilePosition.setReportSource("Mobile Position");
+
+				// 更新device channel 的经纬度
+				DeviceChannel deviceChannel = new DeviceChannel();
+				deviceChannel.setDeviceId(device.getDeviceId());
+				deviceChannel.setLongitude(mobilePosition.getLongitude());
+				deviceChannel.setLatitude(mobilePosition.getLatitude());
+				deviceChannel.setGpsTime(mobilePosition.getTime());
+				updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel);
+				addMobilePositionList.add(mobilePosition);
+
+
+				// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
+				try {
+					eventPublisher.mobilePositionEventPublish(mobilePosition);
+				}catch (Exception e) {
+					logger.error("[向上级转发移动位置失败] ", e);
+				}
+				if (mobilePosition.getChannelId() == null || mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) ) {
+					List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId());
+					channels.forEach(channel -> {
+						// 发送redis消息。 通知位置信息的变化
+						JSONObject jsonObject = new JSONObject();
+						jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
+						jsonObject.put("serial", channel.getDeviceId());
+						jsonObject.put("code", channel.getChannelId());
+						jsonObject.put("longitude", mobilePosition.getLongitude());
+						jsonObject.put("latitude", mobilePosition.getLatitude());
+						jsonObject.put("altitude", mobilePosition.getAltitude());
+						jsonObject.put("direction", mobilePosition.getDirection());
+						jsonObject.put("speed", mobilePosition.getSpeed());
+						redisCatchStorage.sendMobilePositionMsg(jsonObject);
+					});
+				}else {
+					// 发送redis消息。 通知位置信息的变化
+					JSONObject jsonObject = new JSONObject();
+					jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
+					jsonObject.put("serial", mobilePosition.getDeviceId());
+					jsonObject.put("code", mobilePosition.getChannelId());
+					jsonObject.put("longitude", mobilePosition.getLongitude());
+					jsonObject.put("latitude", mobilePosition.getLatitude());
+					jsonObject.put("altitude", mobilePosition.getAltitude());
+					jsonObject.put("direction", mobilePosition.getDirection());
+					jsonObject.put("speed", mobilePosition.getSpeed());
+					redisCatchStorage.sendMobilePositionMsg(jsonObject);
+				}
+			} catch (DocumentException e) {
+				logger.error("未处理的异常 ", e);
 			}
-
-		} catch (DocumentException e) {
-			logger.error("未处理的异常 ", e);
 		}
-
-
-	}
-
-	private void executeSave(){
-		executeSaveChannel();
-		executeSaveMobilePosition();
-		dynamicTask.stop(talkKey);
-	}
-
-	private void executeSaveChannel(){
-		try {
-			logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size());
-			ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
-			deviceChannelService.batchUpdateChannelGPS(deviceChannels);
+		taskQueue.clear();
+		if(!updateChannelMap.isEmpty()) {
+			List<DeviceChannel>  channels = new ArrayList<>(updateChannelMap.values());
+			logger.info("[移动位置订阅]更新通道位置: {}", channels.size());
+			deviceChannelService.batchUpdateChannel(channels);
 			updateChannelMap.clear();
-		}catch (Exception e) {
-
 		}
-	}
-
-	private void executeSaveMobilePosition(){
-		if (userSetting.isSavePositionHistory()) {
+		if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) {
 			try {
 				logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
 				deviceChannelService.batchAddMobilePosition(addMobilePositionList);
-				addMobilePositionList.clear();
 			}catch (Exception e) {
 				logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size());
 			}
+			addMobilePositionList.clear();
 		}
 	}
-
-
-
+	@Scheduled(fixedRate = 10000)
+	public void execute(){
+		logger.info("[待处理Notify-移动位置订阅消息数量]: {}", taskQueue.size());
+	}
 }

+ 16 - 189
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java

@@ -1,12 +1,9 @@
 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 
 import com.genersoft.iot.vmp.conf.SipConfig;
-import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
-import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
 import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
@@ -14,9 +11,7 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
 import com.genersoft.iot.vmp.service.IDeviceChannelService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
-import com.genersoft.iot.vmp.utils.redis.RedisUtil;
 import gov.nist.javax.sip.message.SIPRequest;
 import org.dom4j.DocumentException;
 import org.dom4j.Element;
@@ -24,10 +19,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
-import org.springframework.util.ObjectUtils;
 
 import javax.sip.InvalidArgumentException;
 import javax.sip.RequestEvent;
@@ -35,8 +27,6 @@ import javax.sip.SipException;
 import javax.sip.header.FromHeader;
 import javax.sip.message.Response;
 import java.text.ParseException;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
  * SIP命令类型: NOTIFY请求,这是作为上级发送订阅请求后,设备才会响应的
@@ -47,15 +37,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 
     private final static Logger logger = LoggerFactory.getLogger(NotifyRequestProcessor.class);
 
-	@Autowired
-	private UserSetting userSetting;
-
-	@Autowired
-	private IVideoManagerStorage storager;
-
-	@Autowired
-	private EventPublisher eventPublisher;
-
 	@Autowired
 	private SipConfig sipConfig;
 
@@ -76,13 +57,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 	@Autowired
 	private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
 
-	private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
-
-	@Qualifier("taskExecutor")
 	@Autowired
-	private ThreadPoolTaskExecutor taskExecutor;
-
-	private int maxQueueCount = 30000;
+	private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor;
 
 	@Override
 	public void afterPropertiesSet() throws Exception {
@@ -93,159 +69,33 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 	@Override
 	public void process(RequestEvent evt) {
 		try {
-			if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
-				responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null);
-				logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
-				return;
-			}else {
-				responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
-			}
-
-		}catch (SipException | InvalidArgumentException | ParseException e) {
-			logger.error("未处理的异常 ", e);
-		}
-		boolean runed = !taskQueue.isEmpty();
-		logger.info("[notify] 待处理消息数量: {}", taskQueue.size());
-		taskQueue.offer(new HandlerCatchData(evt, null, null));
-		if (!runed) {
-			taskExecutor.execute(()-> {
-				while (!taskQueue.isEmpty()) {
-					try {
-						HandlerCatchData take = taskQueue.poll();
-						if (take == null) {
-							continue;
-						}
-						Element rootElement = getRootElement(take.getEvt());
-						if (rootElement == null) {
-							logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
-							continue;
-						}
-						String cmd = XmlUtil.getText(rootElement, "CmdType");
-
-						if (CmdType.CATALOG.equals(cmd)) {
-							logger.info("接收到Catalog通知");
-							notifyRequestForCatalogProcessor.process(take.getEvt());
-						} else if (CmdType.ALARM.equals(cmd)) {
-							logger.info("接收到Alarm通知");
-							processNotifyAlarm(take.getEvt());
-						} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
-							logger.info("接收到MobilePosition通知");
-							processNotifyMobilePosition(take.getEvt());
-						} else {
-							logger.info("接收到消息:" + cmd);
-						}
-					} catch (DocumentException e) {
-						logger.error("处理NOTIFY消息时错误", e);
-					}
-				}
-			});
-		}
-	}
-
-	/**
-	 * 处理MobilePosition移动位置Notify
-	 *
-	 * @param evt
-	 */
-	private void processNotifyMobilePosition(RequestEvent evt) {
-		try {
-			FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
-			String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
-
-			// 回复 200 OK
+			responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
 			Element rootElement = getRootElement(evt);
 			if (rootElement == null) {
-				logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
-				return;
-			}
-
-			MobilePosition mobilePosition = new MobilePosition();
-			mobilePosition.setCreateTime(DateUtil.getNow());
-
-			Element deviceIdElement = rootElement.element("DeviceID");
-			String channelId = deviceIdElement.getTextTrim().toString();
-			Device device = redisCatchStorage.getDevice(deviceId);
-
-			if (device == null) {
-				device = redisCatchStorage.getDevice(channelId);
-				if (device == null) {
-					// 根据通道id查询设备Id
-					List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
-					if (deviceList.size() > 0) {
-						device = deviceList.get(0);
-					}
-				}
-			}
-			if (device == null) {
-				logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
+				logger.error("处理NOTIFY消息时未获取到消息体,{}", evt.getRequest());
+				responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
 				return;
 			}
-			// 兼容设备部分设备上报是通道编号与设备编号一致的情况
-			if(deviceId.equals(channelId)) {
-				List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
-				if (deviceChannels.size() == 1) {
-					channelId = deviceChannels.get(0).getChannelId();
-				}
-			}
-			if (!ObjectUtils.isEmpty(device.getName())) {
-				mobilePosition.setDeviceName(device.getName());
-			}
-
-			mobilePosition.setDeviceId(device.getDeviceId());
-			mobilePosition.setChannelId(channelId);
-			String time = XmlUtil.getText(rootElement, "Time");
-			if (ObjectUtils.isEmpty(time)){
-				mobilePosition.setTime(DateUtil.getNow());
-			}else {
-				mobilePosition.setTime(SipUtils.parseTime(time));
-			}
+			String cmd = XmlUtil.getText(rootElement, "CmdType");
 
-			mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
-			mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
-			if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) {
-				mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed")));
+			if (CmdType.CATALOG.equals(cmd)) {
+				notifyRequestForCatalogProcessor.process(evt);
+			} else if (CmdType.ALARM.equals(cmd)) {
+				processNotifyAlarm(evt);
+			} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
+				notifyRequestForMobilePositionProcessor.process(evt);
 			} else {
-				mobilePosition.setSpeed(0.0);
+				logger.info("接收到消息:" + cmd);
 			}
-			if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) {
-				mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction")));
-			} else {
-				mobilePosition.setDirection(0.0);
-			}
-			if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) {
-				mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude")));
-			} else {
-				mobilePosition.setAltitude(0.0);
-			}
-			logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
-					mobilePosition.getLongitude(), mobilePosition.getLatitude());
-			mobilePosition.setReportSource("Mobile Position");
-
-			// 更新device channel 的经纬度
-			DeviceChannel deviceChannel = new DeviceChannel();
-			deviceChannel.setDeviceId(device.getDeviceId());
-			deviceChannel.setChannelId(channelId);
-			deviceChannel.setLongitude(mobilePosition.getLongitude());
-			deviceChannel.setLatitude(mobilePosition.getLatitude());
-			deviceChannel.setGpsTime(mobilePosition.getTime());
-//			deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
-//
-//			mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
-//			mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
-//			mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
-//			mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
-
-			deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
-
-		} catch (DocumentException  e) {
+		} catch (SipException | InvalidArgumentException | ParseException e) {
 			logger.error("未处理的异常 ", e);
+		} catch (DocumentException e) {
+			throw new RuntimeException(e);
 		}
-	}
 
+	}
 	/***
 	 * 处理alarm设备报警Notify
-	 *
-	 * @param evt
 	 */
 	private void processNotifyAlarm(RequestEvent evt) {
 		if (!sipConfig.isAlarm()) {
@@ -336,28 +186,5 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 		}
 	}
 
-	public void setCmder(SIPCommander cmder) {
-	}
-
-	public void setStorager(IVideoManagerStorage storager) {
-		this.storager = storager;
-	}
-
-	public void setPublisher(EventPublisher publisher) {
-		this.publisher = publisher;
-	}
 
-	public void setRedis(RedisUtil redis) {
-	}
-
-	public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) {
-	}
-
-	public IRedisCatchStorage getRedisCatchStorage() {
-		return redisCatchStorage;
-	}
-
-	public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
-		this.redisCatchStorage = redisCatchStorage;
-	}
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -289,7 +289,7 @@ public class ZLMHttpHookListener {
                 String channelId = ssrcTransactionForAll.get(0).getChannelId();
                 DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
                 if (deviceChannel != null) {
-                    result.setEnable_audio(deviceChannel.isHasAudio());
+                    result.setEnable_audio(deviceChannel.getHasAudio());
                 }
                 // 如果是录像下载就设置视频间隔十秒
                 if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) {

+ 5 - 0
src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java

@@ -102,4 +102,9 @@ public interface IDeviceChannelService {
 
     void batchAddMobilePosition(List<MobilePosition> addMobilePositionList);
 
+    void online(DeviceChannel channel);
+
+    void offline(DeviceChannel channel);
+
+    void delete(DeviceChannel channel);
 }

+ 65 - 10
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java

@@ -23,6 +23,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.ObjectUtils;
 
 import java.util.ArrayList;
@@ -247,26 +248,50 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
         return channelMapper.batchOnline(channels);
     }
 
+    @Override
+    public void online(DeviceChannel channel) {
+        channelMapper.online(channel.getDeviceId(), channel.getChannelId());
+    }
+
     @Override
     public int channelsOffline(List<DeviceChannel> channels) {
         return channelMapper.batchOffline(channels);
     }
 
+
+    @Override
+    public void offline(DeviceChannel channel) {
+        channelMapper.offline(channel.getDeviceId(), channel.getChannelId());
+    }
+
+    @Override
+    public void delete(DeviceChannel channel) {
+        channelMapper.del(channel.getDeviceId(), channel.getChannelId());
+    }
+
     @Override
     public DeviceChannel getOne(String deviceId, String channelId){
         return channelMapper.queryChannel(deviceId, channelId);
     }
 
     @Override
-    public void batchUpdateChannel(List<DeviceChannel> channels) {
+    public synchronized void batchUpdateChannel(List<DeviceChannel> channels) {
         String now = DateUtil.getNow();
         for (DeviceChannel channel : channels) {
             channel.setUpdateTime(now);
         }
-        channelMapper.batchUpdate(channels);
-        for (DeviceChannel channel : channels) {
-            if (channel.getParentId() != null) {
-                channelMapper.updateChannelSubCount(channel.getDeviceId(), channel.getParentId());
+        int limitCount = 1000;
+        if (!channels.isEmpty()) {
+            if (channels.size() > limitCount) {
+                for (int i = 0; i < channels.size(); i += limitCount) {
+                    int toIndex = i + limitCount;
+                    if (i + limitCount > channels.size()) {
+                        toIndex = channels.size();
+                    }
+                    channelMapper.batchUpdate(channels.subList(i, toIndex));
+                }
+            }else {
+                channelMapper.batchUpdate(channels);
             }
         }
     }
@@ -355,15 +380,45 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
     }
 
     @Override
+    @Transactional
     public void batchUpdateChannelGPS(List<DeviceChannel> channelList) {
-        channelMapper.batchUpdatePosition(channelList);
+        for (DeviceChannel deviceChannel : channelList) {
+            deviceChannel.setUpdateTime(DateUtil.getNow());
+            if (deviceChannel.getGpsTime() == null) {
+                deviceChannel.setGpsTime(DateUtil.getNow());
+            }
+        }
+        int count = 1000;
+        if (channelList.size() > count) {
+            for (int i = 0; i < channelList.size(); i+=count) {
+                int toIndex = i+count;
+                if ( i + count > channelList.size()) {
+                    toIndex = channelList.size();
+                }
+                List<DeviceChannel> channels = channelList.subList(i, toIndex);
+                channelMapper.batchUpdatePosition(channels);
+            }
+        }else {
+            channelMapper.batchUpdatePosition(channelList);
+        }
     }
 
     @Override
+    @Transactional
     public void batchAddMobilePosition(List<MobilePosition> mobilePositions) {
-        deviceMobilePositionMapper.batchInsert(mobilePositions);
+//        int count = 500;
+//        if (mobilePositions.size() > count) {
+//            for (int i = 0; i < mobilePositions.size(); i+=count) {
+//                int toIndex = i+count;
+//                if ( i + count > mobilePositions.size()) {
+//                    toIndex = mobilePositions.size();
+//                }
+//                List<MobilePosition> mobilePositionsSub = mobilePositions.subList(i, toIndex);
+//                deviceMobilePositionMapper.batchadd(mobilePositionsSub);
+//            }
+//        }else {
+//            deviceMobilePositionMapper.batchadd(mobilePositions);
+//        }
+        deviceMobilePositionMapper.batchadd(mobilePositions);
     }
-
-
-
 }

+ 2 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java

@@ -149,7 +149,6 @@ public class PlayServiceImpl implements IPlayService {
             logger.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", deviceId, channelId);
             throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
         }
-
         Device device = redisCatchStorage.getDevice(deviceId);
         if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) {
             logger.warn("[点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
@@ -163,6 +162,8 @@ public class PlayServiceImpl implements IPlayService {
         InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
         if (inviteInfo != null ) {
             if (inviteInfo.getStreamInfo() == null) {
+                // 释放生成的ssrc,使用上一次申请的
+                ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
                 // 点播发起了但是尚未成功, 仅注册回调等待结果即可
                 inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
                 logger.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId: {}", device.getDeviceId(), channelId);

+ 14 - 6
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java

@@ -170,15 +170,19 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
             callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null);
             return;
         }
-        HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
-        hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
-            StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
-                    mediaInfo, param.getApp(), param.getStream(), null, null);
-            callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
-        });
+
         if (param.isEnable()) {
             String talkKey = UUID.randomUUID().toString();
             String delayTalkKey = UUID.randomUUID().toString();
+            HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
+            hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
+                dynamicTask.stop(delayTalkKey);
+                hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
+                StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
+                        mediaInfo, param.getApp(), param.getStream(), null, null);
+                callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
+            });
+
             dynamicTask.startDelay(delayTalkKey, ()->{
                 StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
                 if (streamInfo != null) {
@@ -324,6 +328,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
             zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
         }
         if ("ffmpeg".equalsIgnoreCase(param.getType())){
+            if (param.getTimeoutMs() == 0) {
+                param.setTimeoutMs(15);
+            }
             result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl().trim(), param.getDstUrl(),
                     param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(),
                     param.getFfmpegCmdKey());
@@ -379,6 +386,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
             gbStreamMapper.del(app, stream);
             videoManagerStorager.deleteStreamProxy(app, stream);
             redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream);
+            redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PUSH", app, stream);
             JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
             if (jsonObject != null && jsonObject.getInteger("code") == 0) {
                 logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", app, stream);

+ 17 - 0
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java

@@ -401,6 +401,23 @@ public interface DeviceChannelMapper {
             " </script>"})
     int updatePosition(DeviceChannel deviceChannel);
 
+    @Update({"<script>" +
+            "<foreach collection='deviceChannelList' item='item' separator=';'>" +
+            " UPDATE" +
+            " wvp_device_channel" +
+            " SET gps_time=#{item.gpsTime}" +
+            "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" +
+            "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" +
+            "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" +
+            "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" +
+            "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" +
+            "<if test='item.latitudeWgs84 != null'>, latitude_wgs84=#{item.latitudeWgs84}</if>" +
+            "WHERE device_id=#{item.deviceId} " +
+            " <if test='item.channelId != null' > AND channel_id=#{item.channelId}</if>" +
+            "</foreach>" +
+            "</script>"})
+    int batchUpdatePosition(List<DeviceChannel> deviceChannelList);
+
     @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0")
     List<DeviceChannel> getAllChannelInPlay();
 

+ 22 - 9
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java

@@ -13,8 +13,7 @@ import java.util.List;
 public interface DeviceMobilePositionMapper {
 
     @Insert("INSERT INTO wvp_device_mobile_position (device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source,longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
-            "VALUES (#{deviceId}, #{channelId}, #{deviceName}, #{time}, #{longitude}, #{latitude}, #{altitude}, " +
-            "#{speed}, #{direction}, #{reportSource}, #{longitudeGcj02}, #{latitudeGcj02}, #{longitudeWgs84}, #{latitudeWgs84}, #{createTime})")
+            "VALUES (#{deviceId}, #{channelId}, #{deviceName}, #{time}, #{longitude}, #{latitude}, #{altitude}, #{speed}, #{direction}, #{reportSource}, #{longitudeGcj02}, #{latitudeGcj02}, #{longitudeWgs84}, #{latitudeWgs84}, #{createTime})")
     int insertNewPosition(MobilePosition mobilePosition);
 
     @Select(value = {" <script>" +
@@ -34,19 +33,33 @@ public interface DeviceMobilePositionMapper {
     @Delete("DELETE FROM wvp_device_mobile_position WHERE device_id = #{deviceId}")
     int clearMobilePositionsByDeviceId(String deviceId);
 
+
     @Insert("<script> " +
             "insert into wvp_device_mobile_position " +
-            "(device_id,channel_id, device_name,time,longitude," +
-            "latitude,altitude,speed,direction," +
-            "report_source,longitude_gcj02,latitude_gcj02," +
-            "longitude_wgs84,latitude_wgs84,create_time)"+
+            "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
+            "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
             "values " +
             "<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
             "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
             "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
-            "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, " +
-            "#{item.longitudeWgs84}, #{item.latitudeWgs84}, #{item.createTime}) " +
+            "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
+            "#{item.createTime}) " +
             "</foreach> " +
             "</script>")
-    void batchInsert(List<MobilePosition> mobilePositions);
+    void batchadd2(List<MobilePosition> mobilePositions);
+
+    @Insert("<script> " +
+            "<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
+            "insert into wvp_device_mobile_position " +
+            "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
+            "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
+            "values " +
+            "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
+            "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
+            "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
+            "#{item.createTime}); " +
+            "</foreach> " +
+            "</script>")
+    void batchadd(List<MobilePosition> mobilePositions);
+
 }

+ 2 - 3
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java

@@ -7,7 +7,6 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
-import com.genersoft.iot.vmp.service.IGbStreamService;
 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -139,7 +138,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
 			gbIdSet.add(deviceChannel.getChannelId());
 			if (allChannelMap.containsKey(deviceChannel.getChannelId())) {
 				deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId());
-				deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio());
+				deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).getHasAudio());
 				if (allChannelMap.get(deviceChannel.getChannelId()).isStatus() !=deviceChannel.isStatus()){
 					List<String> strings = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getChannelId());
 					if (!CollectionUtils.isEmpty(strings)){
@@ -275,7 +274,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
 					deviceChannel.setUpdateTime(DateUtil.getNow());
 					if (allChannelMap.containsKey(deviceChannel.getChannelId())) {
 						deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId());
-						deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio());
+						deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).getHasAudio());
 						if (allChannelMap.get(deviceChannel.getChannelId()).isStatus() !=deviceChannel.isStatus()){
 							List<String> strings = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getChannelId());
 							if (!CollectionUtils.isEmpty(strings)){