Browse Source

Merge branch 'wvp-28181-2.0' into main-dev

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
648540858 2 years ago
parent
commit
c014a90cc6
23 changed files with 497 additions and 238 deletions
  1. 1 0
      doc/_sidebar.md
  2. 15 0
      src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
  3. 4 0
      src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
  4. 5 5
      src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
  5. 3 5
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
  6. 0 1
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
  7. 2 0
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
  8. 6 0
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
  9. 94 180
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
  10. 24 21
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
  11. 2 2
      src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java
  12. 9 1
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
  13. 20 12
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
  14. 14 3
      src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
  15. 2 3
      src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
  16. 2 2
      src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
  17. 33 1
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
  18. 120 0
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
  19. 5 0
      src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
  20. 2 2
      src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
  21. 14 0
      src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
  22. 3 0
      src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/ptz/PtzController.java
  23. 117 0
      src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java

+ 1 - 0
doc/_sidebar.md

@@ -1,6 +1,7 @@
 <!-- 侧边栏 -->
 
 * **编译与部署**
+  * [测试](_content/introduction/test.md)
   * [编译](_content/introduction/compile.md)
   * [配置](_content/introduction/config.md)
   * [部署](_content/introduction/deployment.md)

+ 15 - 0
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java

@@ -101,6 +101,21 @@ public class VideoManagerConstants {
 	 */
 	public static final String VM_MSG_STREAM_PUSH_REQUESTED = "VM_MSG_STREAM_PUSH_REQUESTED";
 
+	/**
+	 * redis 消息通知上级平台开始观看流
+	 */
+	public static final String VM_MSG_STREAM_START_PLAY_NOTIFY = "VM_MSG_STREAM_START_PLAY_NOTIFY";
+
+	/**
+	 * redis 消息通知上级平台停止观看流
+	 */
+	public static final String VM_MSG_STREAM_STOP_PLAY_NOTIFY = "VM_MSG_STREAM_STOP_PLAY_NOTIFY";
+
+	/**
+	 * redis 消息接收关闭一个推流
+	 */
+	public static final String VM_MSG_STREAM_PUSH_CLOSE_REQUESTED = "VM_MSG_STREAM_PUSH_CLOSE_REQUESTED";
+
 
 	/**
 	 * redis 消息通知平台通知设备推流结果

+ 4 - 0
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java

@@ -46,6 +46,9 @@ public class RedisMsgListenConfig {
 	@Autowired
 	private RedisCloseStreamMsgListener redisCloseStreamMsgListener;
 
+	@Autowired
+	private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener;
+
 
 	/**
 	 * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
@@ -67,6 +70,7 @@ public class RedisMsgListenConfig {
 		container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
 		container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
 		container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
+		container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
         return container;
     }
 }

+ 5 - 5
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java

@@ -64,7 +64,7 @@ public class SipLayer implements CommandLineRunner {
 		try {
 			sipStack = (SipStackImpl)SipFactory.getInstance().createSipStack(DefaultProperties.getProperties(monitorIp, userSetting.getSipLog()));
 		} catch (PeerUnavailableException e) {
-			logger.error("[Sip Server] SIP服务启动失败, 监听地址{}失败,请检查ip是否正确", monitorIp);
+			logger.error("[SIP SERVER] SIP服务启动失败, 监听地址{}失败,请检查ip是否正确", monitorIp);
 			return;
 		}
 
@@ -76,12 +76,12 @@ public class SipLayer implements CommandLineRunner {
 			tcpSipProvider.addSipListener(sipProcessorObserver);
 			tcpSipProviderMap.put(monitorIp, tcpSipProvider);
 
-			logger.info("[Sip Server] tcp://{}:{} 启动成功", monitorIp, port);
+			logger.info("[SIP SERVER] tcp://{}:{} 启动成功", monitorIp, port);
 		} catch (TransportNotSupportedException
 				 | TooManyListenersException
 				 | ObjectInUseException
 				 | InvalidArgumentException e) {
-			logger.error("[Sip Server] tcp://{}:{} SIP服务启动失败,请检查端口是否被占用或者ip是否正确"
+			logger.error("[SIP SERVER] tcp://{}:{} SIP服务启动失败,请检查端口是否被占用或者ip是否正确"
 					, monitorIp, port);
 		}
 
@@ -93,12 +93,12 @@ public class SipLayer implements CommandLineRunner {
 
 			udpSipProviderMap.put(monitorIp, udpSipProvider);
 
-			logger.info("[Sip Server] udp://{}:{} 启动成功", monitorIp, port);
+			logger.info("[SIP SERVER] udp://{}:{} 启动成功", monitorIp, port);
 		} catch (TransportNotSupportedException
 				 | TooManyListenersException
 				 | ObjectInUseException
 				 | InvalidArgumentException e) {
-			logger.error("[Sip Server] udp://{}:{} SIP服务启动失败,请检查端口是否被占用或者ip是否正确"
+			logger.error("[SIP SERVER] udp://{}:{} SIP服务启动失败,请检查端口是否被占用或者ip是否正确"
 					, monitorIp, port);
 		}
 	}

+ 3 - 5
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java

@@ -284,7 +284,7 @@ public class SIPRequestHeaderPlarformProvider {
 		viaHeader.setRPort();
 		viaHeaders.add(viaHeader);
 		// from
-		SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(platform.getDeviceGBId(),
+		SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(sendRtpItem.getChannelId(),
 				platform.getDeviceIp() + ":" + platform.getDevicePort());
 		Address fromAddress = SipFactory.getInstance().createAddressFactory().createAddress(fromSipURI);
 		FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, sendRtpItem.getToTag());
@@ -297,13 +297,10 @@ public class SIPRequestHeaderPlarformProvider {
 		MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70);
 		// ceq
 		CSeqHeader cSeqHeader = SipFactory.getInstance().createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.BYE);
-		MessageFactoryImpl messageFactory = (MessageFactoryImpl) SipFactory.getInstance().createMessageFactory();
-		// 设置编码, 防止中文乱码
-		messageFactory.setDefaultContentEncodingCharset("gb2312");
 
 		CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(sendRtpItem.getCallId());
 
-		request = (SIPRequest) messageFactory.createRequest(requestURI, Request.BYE, callIdHeader, cSeqHeader, fromHeader,
+		request = (SIPRequest) SipFactory.getInstance().createMessageFactory().createRequest(requestURI, Request.BYE, callIdHeader, cSeqHeader, fromHeader,
 				toHeader, viaHeaders, maxForwards);
 
 		request.addHeader(SipUtils.createUserAgentHeader(gitUtil));
@@ -311,6 +308,7 @@ public class SIPRequestHeaderPlarformProvider {
 		String sipAddress = platform.getDeviceIp() + ":" + platform.getDevicePort();
 		Address concatAddress = SipFactory.getInstance().createAddressFactory().createAddress(SipFactory.getInstance().createAddressFactory()
 				.createSipURI(platform.getDeviceGBId(), sipAddress));
+
 		request.addHeader(SipFactory.getInstance().createHeaderFactory().createContactHeader(concatAddress));
 
 		return request;

+ 0 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

@@ -378,7 +378,6 @@ public class SIPCommander implements ISIPCommander {
             mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
             errorEvent.response(e);
         }), e -> {
-            // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
             ResponseEvent responseEvent = (ResponseEvent) e.event;
             SIPResponse response = (SIPResponse) responseEvent.getResponse();
             streamSession.put(device.getDeviceId(), channelId, "play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response,

+ 2 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java

@@ -236,6 +236,8 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
                     continue;
                 }else {
                     if (channel.getChannelId().length() != 20) {
+                        catalogXml.append("</Item>\r\n");
+                        logger.warn("[编号长度异常] {} 长度错误,请使用20位长度的国标编号,当前长度:{}", channel.getChannelId(), channel.getChannelId().length());
                         catalogXml.append("</Item>\r\n");
                         continue;
                     }

+ 6 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java

@@ -3,6 +3,8 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@@ -14,6 +16,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.IPlayService;
 import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
 import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
@@ -56,6 +59,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 	@Autowired
     private IRedisCatchStorage redisCatchStorage;
 
+	@Autowired
+    private UserSetting userSetting;
+
 	@Autowired
 	private IVideoManagerStorage storager;
 

+ 94 - 180
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java

@@ -2,9 +2,11 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 
 import com.genersoft.iot.vmp.common.InviteInfo;
 import com.genersoft.iot.vmp.common.InviteSessionType;
+import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
+import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@@ -18,6 +20,7 @@ import com.genersoft.iot.vmp.service.IDeviceService;
 import com.genersoft.iot.vmp.service.IInviteStreamService;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IPlayService;
+import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -31,11 +34,7 @@ import org.springframework.stereotype.Component;
 import javax.sip.InvalidArgumentException;
 import javax.sip.RequestEvent;
 import javax.sip.SipException;
-import javax.sip.address.SipURI;
 import javax.sip.header.CallIdHeader;
-import javax.sip.header.FromHeader;
-import javax.sip.header.HeaderAddress;
-import javax.sip.header.ToHeader;
 import javax.sip.message.Response;
 import java.text.ParseException;
 import java.util.HashMap;
@@ -63,12 +62,18 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 	@Autowired
 	private IInviteStreamService inviteStreamService;
 
+	@Autowired
+	private IPlatformService platformService;
+
 	@Autowired
 	private IDeviceService deviceService;
 
 	@Autowired
 	private AudioBroadcastManager audioBroadcastManager;
 
+	@Autowired
+	private IDeviceChannelService channelService;
+
 	@Autowired
 	private IVideoManagerStorage storager;
 
@@ -90,6 +95,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 	@Autowired
 	private IPlayService playService;
 
+	@Autowired
+	private UserSetting userSetting;
+
 	@Override
 	public void afterPropertiesSet() throws Exception {
 		// 添加消息处理的订阅
@@ -102,201 +110,107 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 	 */
 	@Override
 	public void process(RequestEvent evt) {
-
-		// TODO 此处需要重构
-		SIPRequest request =(SIPRequest) evt.getRequest();
+		SIPRequest request = (SIPRequest) evt.getRequest();
 		try {
 			responseAck(request, Response.OK);
 		} catch (SipException | InvalidArgumentException | ParseException e) {
 			logger.error("[回复BYE信息失败],{}", e.getMessage());
 		}
 		CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
-			String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
-			String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
-			SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId());
-			logger.info("[收到bye] {}/{}", platformGbId, channelId);
-			if (sendRtpItem != null){
-				String streamId = sendRtpItem.getStream();
-				Map<String, Object> param = new HashMap<>();
-				param.put("vhost","__defaultVhost__");
-				param.put("app",sendRtpItem.getApp());
-				param.put("stream",streamId);
-				param.put("ssrc",sendRtpItem.getSsrc());
-				logger.info("[收到bye] 停止向上级推流:{}", streamId);
-				MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-				redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null);
-				ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
-				zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
-				int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
-				if (totalReaderCount <= 0) {
-					logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
-					if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
-						Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
-						if (device == null) {
-							logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
-						}
-						try {
-							logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), channelId);
-							cmder.streamByeCmd(device, channelId, streamId, null);
-						} catch (InvalidArgumentException | ParseException | SipException |
-								 SsrcTransactionNotFoundException e) {
-							logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
-						}
-					}
-					if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
-						MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
-								sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
-								sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId());
-						redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
-					}
+		SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
+
+		if (sendRtpItem != null){
+			logger.info("[收到bye] 来自平台{}, 停止通道:{}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId());
+			String streamId = sendRtpItem.getStream();
+			Map<String, Object> param = new HashMap<>();
+			param.put("vhost","__defaultVhost__");
+			param.put("app",sendRtpItem.getApp());
+			param.put("stream",streamId);
+			param.put("ssrc",sendRtpItem.getSsrc());
+			logger.info("[收到bye] 停止向上级推流:{}", streamId);
+			MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+			redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
+					callIdHeader.getCallId(), null);
+			zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
+			if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
+				ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
+				if (platform != null) {
+					MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+							sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+							sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+					messageForPushChannel.setPlatFormIndex(platform.getId());
+					redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+				}else {
+					logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId());
 				}
 			}
-			// 可能是设备主动停止
-			Device device = storager.queryVideoDeviceByChannelId(platformGbId);
-			if (device != null) {
-				storager.stopPlay(device.getDeviceId(), channelId);
-				SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
-				if (ssrcTransactionForPlay != null){
-					if (ssrcTransactionForPlay.getCallId().equals(callIdHeader.getCallId())){
-						// 释放ssrc
-						MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlay.getMediaServerId());
-						if (mediaServerItem != null) {
-							mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlay.getSsrc());
-						}
-						streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream());
+
+			int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
+			if (totalReaderCount <= 0) {
+				logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
+				if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
+					Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
+					if (device == null) {
+						logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
 					}
-					InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
-					inviteStreamService.removeInviteInfo(inviteInfo);
-					if (inviteInfo != null) {
-						if (inviteInfo.getStreamInfo() != null) {
-							mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream());
-						}
+					try {
+						logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+						cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null);
+					} catch (InvalidArgumentException | ParseException | SipException |
+							 SsrcTransactionNotFoundException e) {
+						logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
 					}
 				}
-				SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null);
-				if (ssrcTransactionForPlayBack != null) {
-					// 释放ssrc
-					MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlayBack.getMediaServerId());
-					if (mediaServerItem != null) {
-						mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlayBack.getSsrc());
-					}
-					streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream());
-					InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, device.getDeviceId(), channelId);
+			}
+		}
 
-					if (inviteInfo != null) {
-						inviteStreamService.removeInviteInfo(inviteInfo);
-						if (inviteInfo.getStreamInfo() != null) {
-							mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream());
-						}
-					}
-				}
+
+
+			// 可能是设备发送的停止
+			SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(null, null, callIdHeader.getCallId(), null);
+			if (ssrcTransaction == null) {
+				logger.info("[收到bye] 但是无法获取推流信息和发流信息,忽略此请求");
+				logger.info(request.toString());
+				return;
 			}
+			logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
 
-		SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(null, null, request.getCallIdHeader().getCallId(), null);
-		if (ssrcTransaction != null) {
+			Device device = deviceService.getDevice(ssrcTransaction.getDeviceId());
+			if (device == null) {
+				logger.info("[收到bye] 未找到设备:{} ", ssrcTransaction.getDeviceId());
+				return;
+			}
+			DeviceChannel channel = channelService.getOne(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
+			if (channel == null) {
+				logger.info("[收到bye] 未找到通道,设备:{}, 通道:{}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
+				return;
+			}
+			storager.stopPlay(device.getDeviceId(), channel.getChannelId());
+			InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId());
+			if (inviteInfo != null) {
+				inviteStreamService.removeInviteInfo(inviteInfo);
+				if (inviteInfo.getStreamInfo() != null) {
+					mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStreamInfo().getStream());
+				}
+			}
 			// 释放ssrc
 			MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId());
 			if (mediaServerItem != null) {
 				mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc());
 			}
-
-			switch (ssrcTransaction.getType()) {
-//					case play:
-//						break;
-//					case talk:
-//						break;
-//					case playback:
-//						break;
-//					case download:
-//						break;
-				case BROADCAST:
-					String channelId1 = ssrcTransaction.getChannelId();
-
-					Device deviceFromTransaction = storager.queryVideoDevice(ssrcTransaction.getDeviceId());
-					if (deviceFromTransaction == null) {
-						ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(ssrcTransaction.getDeviceId());
-						if (parentPlatform != null) {
-							// 来自上级平台的停止对讲
-							logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", ssrcTransaction.getDeviceId(), channelId1);
-							// 释放ssrc
-							streamSession.remove(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
-							if (mediaServerItem != null) {
-								zlmrtpServerFactory.closeRtpServer(mediaServerItem, ssrcTransaction.getStream());
-							}
-							// 查找来源的对讲设备,发送停止
-							Device sourceDevice = storager.queryVideoDeviceByPlatformIdAndChannelId(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
-							if (sourceDevice != null) {
-								playService.stopAudioBroadcast(sourceDevice.getDeviceId(), channelId);
-							}
-						}
-					}else {
-						// 来自设备的停止对讲
-
-						// 如果是来自设备,则听停止推流。 来自上级则停止收流
-						AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(ssrcTransaction.getDeviceId(), channelId1);
-						if (audioBroadcastCatch != null) {
-							//
-							SendRtpItem sendRtpItemForBroadcast =  redisCatchStorage.querySendRTPServer(ssrcTransaction.getDeviceId(), channelId1,
-									audioBroadcastCatch.getStream(), audioBroadcastCatch.getSipTransactionInfo().getCallId());
-							if (sendRtpItemForBroadcast != null) {
-								MediaServerItem mediaServerItemForBroadcast = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-								if (mediaServerItemForBroadcast == null) {
-									return;
-								}
-
-								Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), audioBroadcastCatch.getStream());
-								if (ready) {
-									Map<String, Object> param = new HashMap<>();
-									param.put("vhost","__defaultVhost__");
-									param.put("app",sendRtpItem.getApp());
-									param.put("stream",audioBroadcastCatch.getStream());
-									param.put("ssrc",sendRtpItem.getSsrc());
-									logger.info("[收到bye] 停止推流:{}", audioBroadcastCatch.getStream());
-									MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-									redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), request.getCallIdHeader().getCallId(), null);
-									zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
-								}
-								if (audioBroadcastCatch.isFromPlatform()) {
-									// 上级也正在点播。 向上级回复bye
-									List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(null, channelId1, null, null);
-									if (ssrcTransactions.size() > 0) {
-										for (SsrcTransaction transaction : ssrcTransactions) {
-											if (transaction.getType().equals(InviteSessionType.BROADCAST)) {
-												ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(transaction.getDeviceId());
-												if (parentPlatform != null) {
-													try {
-														commanderForPlatform.streamByeCmd(parentPlatform, channelId1, transaction.getStream(), transaction.getCallId(), eventResult -> {
-															streamSession.remove(transaction.getDeviceId(), transaction.getChannelId(), transaction.getStream());
-														});
-														audioBroadcastManager.del(transaction.getDeviceId(), channelId1);
-													} catch (InvalidArgumentException | SipException | ParseException |
-															 SsrcTransactionNotFoundException e) {
-														logger.error("[命令发送失败] 向{}发送bye失败", transaction.getDeviceId());
-													}
-													// 释放ssrc
-													MediaServerItem mediaServerItemFromTransaction = mediaServerService.getOne(transaction.getMediaServerId());
-													if (mediaServerItemFromTransaction != null) {
-														mediaServerService.releaseSsrc(mediaServerItemFromTransaction.getId(), transaction.getSsrc());
-													}
-													streamSession.remove(transaction.getDeviceId(), transaction.getChannelId(), transaction.getStream());
-												}
-											}
-										}
-									}
-
-								}
-								redisCatchStorage.deleteSendRTPServer(ssrcTransaction.getDeviceId(), channelId1,
-										audioBroadcastCatch.getStream(), audioBroadcastCatch.getSipTransactionInfo().getCallId());
-
-							}
-						}
-					}
-					audioBroadcastManager.del(ssrcTransaction.getDeviceId(), channelId1);
-					break;
-				default:
-					break;
+			streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcTransaction.getStream());
+			if (ssrcTransaction.getType() == InviteSessionType.BROADCAST) {
+				// 查找来源的对讲设备,发送停止
+				Device sourceDevice = storager.queryVideoDeviceByPlatformIdAndChannelId(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
+				if (sourceDevice != null) {
+					playService.stopAudioBroadcast(sourceDevice.getDeviceId(), channel.getChannelId());
+				}
+			}
+			AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(ssrcTransaction.getDeviceId(), channel.getChannelId());
+			if (audioBroadcastCatch != null) {
+				// 来自上级平台的停止对讲
+				logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", ssrcTransaction.getDeviceId(), channel.getChannelId());
+				audioBroadcastManager.del(ssrcTransaction.getDeviceId(), channel.getChannelId());
 			}
-
-		}
 	}
 }

+ 24 - 21
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -210,16 +210,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                             return;
                         } else {
                             streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
-                            if (streamPushItem == null || streamPushItem.getServerId().equals(userSetting.getServerId())) {
-                                logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
-                                try {
-                                    responseAck(request, Response.GONE);
-                                } catch (SipException | InvalidArgumentException | ParseException e) {
-                                    logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
-                                }
-                                return;
-                            }else {
-                                 // TODO 可能漏回复消息
+                            if (streamPushItem != null) {
+                                mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
+                            }
+                            if (mediaServerItem == null) {
+                                mediaServerItem = mediaServerService.getDefaultMediaServer();
                             }
                         }
                     } else {
@@ -380,7 +375,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                     }
                     logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
                     SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
-                            device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
+                            device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> {
+                                return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
+                            });
 
                     if (tcpActive != null) {
                         sendRtpItem.setTcpActive(tcpActive);
@@ -584,14 +581,16 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
      * 安排推流
      */
     private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform,
-                                 CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
-                                 int port, Boolean tcpActive, boolean mediaTransmissionTCP,
-                                 String channelId, String addressStr, String ssrc, String requesterId) {
-        Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
-        if (streamReady != null && streamReady) {
-            // 自平台内容
-            SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
-                    gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
+                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
+                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
+                            String channelId, String addressStr, String ssrc, String requesterId) {
+            Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
+            if (streamReady != null && streamReady) {
+                // 自平台内容
+                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
+                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{
+                            return  redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
+                        });
 
             if (sendRtpItem == null) {
                 logger.warn("服务器端口资源不足");
@@ -631,7 +630,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
             if (streamReady != null && streamReady) {
                 // 自平台内容
                 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
-                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
+                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{
+                            return  redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
+                        });
 
                 if (sendRtpItem == null) {
                     logger.warn("服务器端口资源不足");
@@ -747,7 +748,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                 dynamicTask.stop(callIdHeader.getCallId());
                 if (serverId.equals(userSetting.getServerId())) {
                     SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
-                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
+                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> {
+                                return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
+                            });
 
                     if (sendRtpItem == null) {
                         logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java

@@ -125,7 +125,7 @@ public class SipUtils {
         strTmp = String.format("%02X", moveSpeed);
         builder.append(strTmp, 0, 2);
         builder.append(strTmp, 0, 2);
-        
+
         //优化zoom低倍速下的变倍速率
         if ((zoomSpeed > 0) && (zoomSpeed <16))
         {
@@ -283,4 +283,4 @@ public class SipUtils {
         }
         return localDateTime.format(DateUtil.formatterISO8601);
     }
-}
+}

+ 9 - 1
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -23,6 +23,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
 import com.genersoft.iot.vmp.service.*;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@@ -543,6 +544,13 @@ public class ZLMHttpHookListener {
                             }
                             redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
                                     sendRtpItem.getCallId(), sendRtpItem.getStream());
+                            if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
+                                MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+                                        sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+                                        sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+                                messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
+                                redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+                            }
                         }
                     }
                 }
@@ -595,7 +603,7 @@ public class ZLMHttpHookListener {
                 }
                 return ret;
             }
-            // 推流具有主动性,暂时不做处理
+            // TODO 推流具有主动性,暂时不做处理
 //			StreamPushItem streamPushItem = streamPushService.getPush(app, streamId);
 //			if (streamPushItem != null) {
 //				// TODO 发送停止

+ 20 - 12
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java

@@ -227,13 +227,14 @@ public class ZLMRTPServerFactory {
      * @param tcp 是否为tcp
      * @return SendRtpItem
      */
-    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp, boolean rtcp){
+    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId,
+                                         String deviceId, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){
 
         // 默认为随机端口
         int localPort = 0;
         if (userSetting.getGbSendStreamStrict()) {
             if (userSetting.getGbSendStreamStrict()) {
-                localPort = keepPort(serverItem, ssrc, localPort);
+                localPort = keepPort(serverItem, ssrc, localPort, callback);
                 if (localPort == 0) {
                     return null;
                 }
@@ -265,11 +266,12 @@ public class ZLMRTPServerFactory {
      * @param tcp 是否为tcp
      * @return SendRtpItem
      */
-    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp, boolean rtcp){
+    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId,
+                                         String app, String stream, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){
         // 默认为随机端口
         int localPort = 0;
         if (userSetting.getGbSendStreamStrict()) {
-            localPort = keepPort(serverItem, ssrc, localPort);
+            localPort = keepPort(serverItem, ssrc, localPort, callback);
             if (localPort == 0) {
                 return null;
             }
@@ -290,10 +292,14 @@ public class ZLMRTPServerFactory {
         return sendRtpItem;
     }
 
+    public interface KeepPortCallback{
+        Boolean keep(String ssrc);
+    }
+
     /**
      * 保持端口,直到需要需要发流时再释放
      */
-    public int keepPort(MediaServerItem serverItem, String ssrc, Integer localPort) {
+    public int keepPort(MediaServerItem serverItem, String ssrc, int localPort, KeepPortCallback keepPortCallback) {
         Map<String, Object> param = new HashMap<>(3);
         param.put("port", localPort);
         param.put("enable_tcp", 1);
@@ -302,18 +308,20 @@ public class ZLMRTPServerFactory {
         if (jsonObject.getInteger("code") == 0) {
             localPort = jsonObject.getInteger("port");
             HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
+            // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
             Integer finalLocalPort = localPort;
             hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
                     (MediaServerItem mediaServerItem, HookParam hookParam)->{
                         logger.info("[上级点播] {}->监听端口到期继续保持监听: {}", ssrc, finalLocalPort);
                         OnRtpServerTimeoutHookParam rtpServerTimeoutHookParam = (OnRtpServerTimeoutHookParam) hookParam;
-                        if (!ssrc.equals(rtpServerTimeoutHookParam.getSsrc())) {
-                            return;
-                        }
-                        int port = keepPort(serverItem, ssrc, finalLocalPort);
-                        if (port == 0) {
-                            logger.info("[上级点播] {}->监听端口失败,移除监听", ssrc);
-                            hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
+                        if (ssrc.equals(rtpServerTimeoutHookParam.getSsrc())) {
+                            if (keepPortCallback.keep(ssrc)) {
+                                logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc);
+                                keepPort(serverItem, ssrc, finalLocalPort, keepPortCallback);
+                            }else {
+                                logger.info("[上级点播] {}->发送取消,无需继续监听", ssrc);
+                                releasePort(serverItem, ssrc);
+                            }
                         }
                     });
             logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort);

+ 14 - 3
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java

@@ -1,7 +1,5 @@
 package com.genersoft.iot.vmp.service.bean;
 
-import java.util.stream.Stream;
-
 /**
  * 当上级平台
  * @author lin
@@ -29,10 +27,15 @@ public class MessageForPushChannel {
     private String gbId;
 
     /**
-     * 请求的平台ID
+     * 请求的平台国标编号
      */
     private String platFormId;
 
+    /**
+     * 请求的平台自增ID
+     */
+    private int platFormIndex;
+
     /**
      * 请求平台名称
      */
@@ -128,4 +131,12 @@ public class MessageForPushChannel {
     public void setMediaServerId(String mediaServerId) {
         this.mediaServerId = mediaServerId;
     }
+
+    public int getPlatFormIndex() {
+        return platFormIndex;
+    }
+
+    public void setPlatFormIndex(int platFormIndex) {
+        this.platFormIndex = platFormIndex;
+    }
 }

+ 2 - 3
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java

@@ -1,6 +1,5 @@
 package com.genersoft.iot.vmp.service.impl;
 
-import com.genersoft.iot.vmp.common.InviteSessionType;
 import com.genersoft.iot.vmp.common.VideoManagerConstants;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
@@ -415,8 +414,8 @@ public class DeviceServiceImpl implements IDeviceService {
         if (device == null) {
             return null;
         }
-        if (ObjectUtils.isEmpty(parentId) || parentId.equals(deviceId)) {
-            parentId = null;
+        if (ObjectUtils.isEmpty(parentId) ) {
+            parentId = deviceId;
         }
         List<DeviceChannel> rootNodes = deviceChannelMapper.getSubChannelsByDeviceId(deviceId, parentId, onlyCatalog);
         return transportChannelsToTree(rootNodes, "");

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java

@@ -362,7 +362,7 @@ public class PlayServiceImpl implements IPlayService {
                     null);
             return;
         }
-        logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
+        logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
                 device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(),
                 device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
         //端口获取失败的ssrcInfo 没有必要发送点播指令
@@ -445,7 +445,7 @@ public class PlayServiceImpl implements IPlayService {
                         InviteErrorCode.SUCCESS.getCode(),
                         InviteErrorCode.SUCCESS.getMsg(),
                         streamInfo);
-                logger.info("[点播成功] deviceId: {}, channelId: {},码流类型:{}", device.getDeviceId(),
+                logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(),
                         device.isSwitchPrimarySubStream() ? "辅码流" : "主码流");
                 String streamUrl;
                 if (mediaServerItemInuse.getRtspPort() != 0) {

+ 33 - 1
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java

@@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.*;
+import com.genersoft.iot.vmp.utils.redis.RedisUtil;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,6 +27,7 @@ import org.springframework.stereotype.Component;
 
 import java.text.ParseException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -127,6 +129,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
                                 case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
                                     RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
                                     requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
+
                                     break;
                                 default:
                                     break;
@@ -311,7 +314,9 @@ public class RedisGbPlayMsgListener implements MessageListener {
         SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
                 content.getPort(), content.getSsrc(), content.getPlatformId(),
                 content.getApp(), content.getStream(), content.getChannelId(),
-                content.getTcp(), content.getRtcp());
+                content.getTcp(), content.getRtcp(), ssrcFromCallback -> {
+                    return querySendRTPServer(content.getPlatformId(), content.getChannelId(), content.getStream(), null) != null;
+                });
 
         WVPResult<ResponseSendItemMsg> result = new WVPResult<>();
         result.setCode(0);
@@ -388,4 +393,31 @@ public class RedisGbPlayMsgListener implements MessageListener {
         });
         redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
     }
+
+    private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
+        if (platformGbId == null) {
+            platformGbId = "*";
+        }
+        if (channelId == null) {
+            channelId = "*";
+        }
+        if (streamId == null) {
+            streamId = "*";
+        }
+        if (callId == null) {
+            callId = "*";
+        }
+        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
+                + userSetting.getServerId() + "_*_"
+                + platformGbId + "_"
+                + channelId + "_"
+                + streamId + "_"
+                + callId;
+        List<Object> scan = RedisUtil.scan(redisTemplate, key);
+        if (scan.size() > 0) {
+            return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0));
+        }else {
+            return null;
+        }
+    }
 }

+ 120 - 0
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java

@@ -0,0 +1,120 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
+import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
+import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.stereotype.Component;
+
+import javax.sip.InvalidArgumentException;
+import javax.sip.SipException;
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 接收redis发送的结束推流请求
+ * @author lin
+ */
+@Component
+public class RedisPushStreamCloseResponseListener implements MessageListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class);
+
+    @Autowired
+    private IStreamPushService streamPushService;
+
+    @Autowired
+    private IRedisCatchStorage redisCatchStorage;
+
+    @Autowired
+    private IVideoManagerStorage storager;
+
+    @Autowired
+    private ISIPCommanderForPlatform commanderFroPlatform;
+
+    @Autowired
+    private UserSetting userSetting;
+
+    @Autowired
+    private IMediaServerService mediaServerService;
+
+    @Autowired
+    private ZLMRTPServerFactory zlmrtpServerFactory;
+
+
+    private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
+
+    public interface PushStreamResponseEvent{
+        void run(MessageForPushChannelResponse response);
+    }
+
+    @Override
+    public void onMessage(Message message, byte[] bytes) {
+        logger.info("[REDIS消息-推流结束]: {}", new String(message.getBody()));
+        MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
+        StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
+        if (push != null) {
+            if (redisCatchStorage.isChannelSendingRTP(push.getGbId())) {
+                List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
+                        push.getGbId());
+                if (sendRtpItems.size() > 0) {
+                    for (SendRtpItem sendRtpItem : sendRtpItems) {
+                        ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
+                        // 停止向上级推流
+                        String streamId = sendRtpItem.getStreamId();
+                        Map<String, Object> param = new HashMap<>();
+                        param.put("vhost","__defaultVhost__");
+                        param.put("app",sendRtpItem.getApp());
+                        param.put("stream",streamId);
+                        param.put("ssrc",sendRtpItem.getSsrc());
+                        logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId);
+                        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+                        redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId());
+                        zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
+
+                        try {
+                            commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem);
+                        } catch (SipException | InvalidArgumentException | ParseException e) {
+                            logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
+                        }
+                        if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
+                            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+                                    sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
+                                    sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+                            messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
+                            redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+                        }
+                    }
+                }
+            }
+        }
+
+    }
+
+    public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
+        responseEvents.put(app + stream, callback);
+    }
+
+    public void removeEvent(String app, String stream) {
+        responseEvents.remove(app + stream);
+    }
+}

+ 5 - 0
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java

@@ -202,5 +202,10 @@ public interface IRedisCatchStorage {
     void removeAllDevice();
 
     void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online);
+
     void sendChannelAddOrDelete(String deviceId, String channelId, boolean add);
+
+    void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel);
+
+    void sendPlatformStopPlayMsg(MessageForPushChannel messageForPushChannel);
 }

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java

@@ -459,8 +459,8 @@ public interface DeviceChannelMapper {
             "select * " +
             "from wvp_device_channel " +
             "where device_id=#{deviceId}" +
-            " <if test='parentId != null '> and parent_id = #{parentId} </if>" +
-            " <if test='parentId == null '> and parent_id is null </if>" +
+            " <if test='parentId != null and parentId != deviceId'> and parent_id = #{parentId} </if>" +
+            " <if test='parentId == null or parentId == deviceId'> and parent_id is null or parent_id = #{deviceId}</if>" +
             " <if test='onlyCatalog == true '> and parental = 1 </if>" +
             " </script>"})
     List<DeviceChannel> getSubChannelsByDeviceId(String deviceId, String parentId, boolean onlyCatalog);

+ 14 - 0
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -622,4 +622,18 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
         // 使用 RedisTemplate<Object, Object> 发送字符串消息会导致发送的消息多带了双引号
         stringRedisTemplate.convertAndSend(key, msg.toString());
     }
+
+    @Override
+    public void sendPlatformStartPlayMsg(MessageForPushChannel msg) {
+        String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY;
+        logger.info("[redis发送通知] 推流被上级平台观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
+        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
+    }
+
+    @Override
+    public void sendPlatformStopPlayMsg(MessageForPushChannel msg) {
+        String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
+        logger.info("[redis发送通知] 上级平台停止观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
+        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
+    }
 }

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/ptz/PtzController.java

@@ -97,6 +97,9 @@ public class PtzController {
 				cmdCode = 32;
 				break;
 			case "stop":
+				horizonSpeed = 0;
+				verticalSpeed = 0;
+				zoomSpeed = 0;
 				break;
 			default:
 				break;

+ 117 - 0
src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java

@@ -0,0 +1,117 @@
+package com.genersoft.iot.vmp.vmanager.rtp;
+
+import com.genersoft.iot.vmp.conf.SipConfig;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.VersionInfo;
+import com.genersoft.iot.vmp.conf.exception.ControllerException;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.service.*;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.Parameter;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+@SuppressWarnings("rawtypes")
+@Tag(name = "第三方服务对接")
+
+@RestController
+@RequestMapping("/api/rtp")
+public class RtpController {
+
+    @Autowired
+    private ZlmHttpHookSubscribe zlmHttpHookSubscribe;
+
+    @Autowired
+    private IMediaServerService mediaServerService;
+
+    @Autowired
+    private VersionInfo versionInfo;
+
+    @Autowired
+    private SipConfig sipConfig;
+
+    @Autowired
+    private UserSetting userSetting;
+
+    @Autowired
+    private IDeviceService deviceService;
+
+    @Autowired
+    private IDeviceChannelService channelService;
+
+    @Autowired
+    private IStreamPushService pushService;
+
+
+    @Autowired
+    private IStreamProxyService proxyService;
+
+
+    @Value("${server.port}")
+    private int serverPort;
+
+
+    @Autowired
+    private IRedisCatchStorage redisCatchStorage;
+
+
+    @GetMapping(value = "/receive/open")
+    @ResponseBody
+    @Operation(summary = "开启收流和获取发流信息")
+    @Parameter(name = "isSend", description = "是否发送,false时只开启收流, true同时返回推流信息", required = true)
+    @Parameter(name = "callId", description = "整个过程的唯一标识,为了与后续接口关联", required = true)
+    @Parameter(name = "ssrc", description = "来源流的SSRC,不传则不校验来源ssrc", required = false)
+    @Parameter(name = "stream", description = "形成的流的ID", required = true)
+    @Parameter(name = "tcpMode", description = "收流模式, 0为UDP, 1为TCP被动", required = true)
+    @Parameter(name = "callBack", description = "回调地址,如果收流超时会通道回调通知,回调为get请求,参数为callId", required = true)
+    public SendRtpItem openRtpServer(Boolean isSend, String ssrc, String callId, String stream, Integer tcpMode, String callBack) {
+        MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
+        if (mediaServerItem == null) {
+            throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer");
+        }
+        return null;
+    }
+
+    @GetMapping(value = "/receive/close")
+    @ResponseBody
+    @Operation(summary = "关闭收流")
+    @Parameter(name = "stream", description = "流的ID", required = true)
+    public void closeRtpServer(String stream) {
+
+    }
+
+    @GetMapping(value = "/send/start")
+    @ResponseBody
+    @Operation(summary = "发送流")
+    @Parameter(name = "ssrc", description = "发送流的SSRC", required = true)
+    @Parameter(name = "ip", description = "目标IP", required = true)
+    @Parameter(name = "port", description = "目标端口", required = true)
+    @Parameter(name = "app", description = "待发送应用名", required = true)
+    @Parameter(name = "stream", description = "待发送流Id", required = true)
+    @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true)
+    @Parameter(name = "onlyAudio", description = "是否只有音频", required = true)
+    @Parameter(name = "streamType", description = "流类型,1为es流,2为ps流, 默认es流", required = false)
+    public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Integer streamType) {
+
+    }
+
+
+
+    @GetMapping(value = "/send/stop")
+    @ResponseBody
+    @Operation(summary = "关闭发送流")
+    @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true)
+    public void closeSendRTP(String callId) {
+
+    }
+
+}