Browse Source

修复转发国标notify-update时信息错误的问题

648540858 1 year ago
parent
commit
901dee2bf4

+ 3 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java

@@ -49,6 +49,7 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
         ParentPlatform parentPlatform = null;
 
         Map<String, List<ParentPlatform>> parentPlatformMap = new HashMap<>();
+        Map<String, DeviceChannel> channelMap = new HashMap<>();
         if (!ObjectUtils.isEmpty(event.getPlatformId())) {
             subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
             if (subscribe == null) {
@@ -67,6 +68,7 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
                     for (DeviceChannel deviceChannel : event.getDeviceChannels()) {
                         List<ParentPlatform> parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(deviceChannel.getChannelId(), platforms);
                         parentPlatformMap.put(deviceChannel.getChannelId(), parentPlatformsForGB);
+                        channelMap.put(deviceChannel.getChannelId(), deviceChannel);
                     }
                 }
             }else if (event.getGbStreams() != null) {
@@ -174,7 +176,7 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
                                 }
                                 logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);
                                 List<DeviceChannel> deviceChannelList = new ArrayList<>();
-                                DeviceChannel deviceChannel = storager.queryChannelInParentPlatform(platform.getServerGBId(), gbId);
+                                DeviceChannel deviceChannel = channelMap.get(gbId);
                                 deviceChannelList.add(deviceChannel);
                                 GbStream gbStream = storager.queryStreamInParentPlatform(platform.getServerGBId(), gbId);
                                 if(gbStream != null){

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java

@@ -597,6 +597,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
         Integer finalIndex = index;
         String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channels,
                 deviceChannels.size(), type, subscribeInfo);
+        System.out.println(catalogXmlContent);
         logger.info("[发送NOTIFY通知]类型: {},发送数量: {}", type, channels.size());
         sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
             logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
@@ -626,7 +627,6 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
 
     private  String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, List<DeviceChannel> channels, int sumNum, String type, SubscribeInfo subscribeInfo) {
         StringBuffer catalogXml = new StringBuffer(600);
-
         String characterSet = parentPlatform.getCharacterSet();
         catalogXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet + "\"?>\r\n")
                 .append("<Notify>\r\n")

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java

@@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import com.google.common.primitives.Bytes;
 import gov.nist.javax.sip.message.SIPRequest;
 import gov.nist.javax.sip.message.SIPResponse;
-import org.apache.commons.lang3.ArrayUtils;
 import org.dom4j.Document;
 import org.dom4j.DocumentException;
 import org.dom4j.Element;
@@ -172,6 +171,7 @@ public abstract class SIPRequestProcessorParent {
 		return getRootElement(evt, "gb2312");
 	}
 	public Element getRootElement(RequestEvent evt, String charset) throws DocumentException {
+
 		if (charset == null) {
 			charset = "gb2312";
 		}

+ 8 - 7
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java

@@ -25,6 +25,7 @@ import org.springframework.util.ObjectUtils;
 
 import javax.sip.RequestEvent;
 import javax.sip.header.FromHeader;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -67,7 +68,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
 
 	private final static String talkKey = "notify-request-for-mobile-position-task";
 
-//	@Async("taskExecutor")
+	@Async("taskExecutor")
 	public void process(RequestEvent evt) {
 		try {
 			FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
@@ -172,11 +173,11 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
 			deviceChannel.setGpsTime(mobilePosition.getTime());
 			updateChannelMap.put(deviceId + channelId, deviceChannel);
 			addMobilePositionList.add(mobilePosition);
-			if(updateChannelMap.size() > 100) {
+			if(updateChannelMap.size() > 2000) {
 				executeSaveChannel();
 			}
 			if (userSetting.isSavePositionHistory()) {
-				if(addMobilePositionList.size() > 100) {
+				if(addMobilePositionList.size() > 2000) {
 					executeSaveMobilePosition();
 				}
 			}
@@ -212,8 +213,8 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
 		dynamicTask.execute();
 		try {
 			logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size());
-//			ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
-//			deviceChannelService.batchUpdateChannelGPS(deviceChannels);
+			ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
+			deviceChannelService.batchUpdateChannelGPS(deviceChannels);
 			updateChannelMap.clear();
 		}catch (Exception e) {
 
@@ -223,8 +224,8 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
 	public void executeSaveMobilePosition(){
 		if (userSetting.isSavePositionHistory()) {
 			try {
-//				logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
-//				deviceChannelService.batchAddMobilePosition(addMobilePositionList);
+				logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
+				deviceChannelService.batchAddMobilePosition(addMobilePositionList);
 				addMobilePositionList.clear();
 			}catch (Exception e) {
 				logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size());

+ 6 - 6
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java

@@ -136,9 +136,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 						} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
 //							logger.info("接收到MobilePosition通知");
 //							processNotifyMobilePosition(take.getEvt());
-							taskExecutor.execute(() -> {
+//							taskExecutor.execute(() -> {
 								notifyRequestForMobilePositionProcessor.process(take.getEvt());
-							});
+//							});
 
 						} else {
 							logger.info("接收到消息:" + cmd);
@@ -226,8 +226,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 			} else {
 				mobilePosition.setAltitude(0.0);
 			}
-			logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
-					mobilePosition.getLongitude(), mobilePosition.getLatitude());
+//			logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
+//					mobilePosition.getLongitude(), mobilePosition.getLatitude());
 			mobilePosition.setReportSource("Mobile Position");
 
 			// 更新device channel 的经纬度
@@ -370,8 +370,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 		this.redisCatchStorage = redisCatchStorage;
 	}
 
-	@Scheduled(fixedRate = 1000)   //每1秒执行一次
+	@Scheduled(fixedRate = 10000)   //每1秒执行一次
 	public void execute(){
-		System.out.println("待处理消息数量: " + taskQueue.size());
+		logger.info("[待处理Notify消息数量]: {}", taskQueue.size());
 	}
 }

+ 2 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java

@@ -149,7 +149,6 @@ public class PlayServiceImpl implements IPlayService {
             logger.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", deviceId, channelId);
             throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
         }
-
         Device device = redisCatchStorage.getDevice(deviceId);
         if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) {
             logger.warn("[点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
@@ -163,6 +162,8 @@ public class PlayServiceImpl implements IPlayService {
         InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
         if (inviteInfo != null ) {
             if (inviteInfo.getStreamInfo() == null) {
+                // 释放生成的ssrc,使用上一次申请的
+                ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
                 // 点播发起了但是尚未成功, 仅注册回调等待结果即可
                 inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
                 logger.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId: {}", device.getDeviceId(), channelId);