瀏覽代碼

修复多平台推流无人观看redis通知

648540858 1 年之前
父節點
當前提交
3291c4b2e6

+ 74 - 42
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java

@@ -15,8 +15,11 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
+import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg;
+import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import gov.nist.javax.sip.message.SIPRequest;
@@ -92,6 +95,12 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 	@Autowired
 	private UserSetting userSetting;
 
+	@Autowired
+	private IStreamPushService pushService;
+
+	@Autowired
+	private RedisGbPlayMsgListener redisGbPlayMsgListener;
+
 	@Override
 	public void afterPropertiesSet() throws Exception {
 		// 添加消息处理的订阅
@@ -124,58 +133,81 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 			param.put("stream",streamId);
 			param.put("ssrc",sendRtpItem.getSsrc());
 			logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId());
-			MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-			redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
-					callIdHeader.getCallId(), null);
-			zlmServerFactory.stopSendRtpStream(mediaInfo, param);
-			if (userSetting.getUseCustomSsrcForParentInvite()) {
-				mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
-			}
+
 			if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
-				ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
-				if (platform != null) {
-					MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
-							sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
-							sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
-					messageForPushChannel.setPlatFormIndex(platform.getId());
-					redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+				// 查询这路流是否是本平台的
+				StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream());
+				if (push!= null && !push.isSelf()) {
+					// 不是本平台的就发送redis消息让其他wvp停止发流
+					ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
+					if (platform != null) {
+						RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId());
+						redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg);
+					}
 				}else {
-					logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId());
-				}
-			}
+					MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+					redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
+							callIdHeader.getCallId(), null);
+					zlmServerFactory.stopSendRtpStream(mediaInfo, param);
+					if (userSetting.getUseCustomSsrcForParentInvite()) {
+						mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
+					}
 
-			AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-			if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
-				// 来自上级平台的停止对讲
-				logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-				audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+					ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
+					if (platform != null) {
+						MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+								sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+								sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+						messageForPushChannel.setPlatFormIndex(platform.getId());
+						redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+					}else {
+						logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId());
+					}
+				}
+			}else {
+				MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+				redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
+						callIdHeader.getCallId(), null);
+				zlmServerFactory.stopSendRtpStream(mediaInfo, param);
+				if (userSetting.getUseCustomSsrcForParentInvite()) {
+					mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
+				}
 			}
+			MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+			if (mediaInfo != null) {
+				AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+				if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
+					// 来自上级平台的停止对讲
+					logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+					audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+				}
 
-			int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
-			if (totalReaderCount <= 0) {
-				logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
-				if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
-					Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
-					if (device == null) {
-						logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
-					}
-					try {
-						logger.info("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-						cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null);
-					} catch (InvalidArgumentException | ParseException | SipException |
-							 SsrcTransactionNotFoundException e) {
-						logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
+				int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
+				if (totalReaderCount <= 0) {
+					logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
+					if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
+						Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
+						if (device == null) {
+							logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
+						}
+						try {
+							logger.info("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+							cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null);
+						} catch (InvalidArgumentException | ParseException | SipException |
+								 SsrcTransactionNotFoundException e) {
+							logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
+						}
 					}
 				}
 			}
 		}
 
-			// 可能是设备发送的停止
-			SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId());
-			if (ssrcTransaction == null) {
-				return;
-			}
-			logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
+		// 可能是设备发送的停止
+		SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId());
+		if (ssrcTransaction == null) {
+			return;
+		}
+		logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
 
 		ParentPlatform platform = platformService.queryPlatformByServerGBId(ssrcTransaction.getDeviceId());
 		if (platform != null ) {

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -579,7 +579,7 @@ public class ZLMHttpHookListener {
                 }
                 // 收到无人观看说明流也没有在往上级推送
                 if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
-                    List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
+                    List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
                             inviteInfo.getChannelId());
                     if (!sendRtpItems.isEmpty()) {
                         for (SendRtpItem sendRtpItem : sendRtpItems) {

+ 49 - 0
src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java

@@ -0,0 +1,49 @@
+package com.genersoft.iot.vmp.service.bean;
+
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+
+
+public class RequestStopPushStreamMsg {
+
+
+    private SendRtpItem sendRtpItem;
+
+
+    private String platformName;
+
+
+    private int platFormIndex;
+
+    public SendRtpItem getSendRtpItem() {
+        return sendRtpItem;
+    }
+
+    public void setSendRtpItem(SendRtpItem sendRtpItem) {
+        this.sendRtpItem = sendRtpItem;
+    }
+
+    public String getPlatformName() {
+        return platformName;
+    }
+
+    public void setPlatformName(String platformName) {
+        this.platformName = platformName;
+    }
+
+
+    public int getPlatFormIndex() {
+        return platFormIndex;
+    }
+
+    public void setPlatFormIndex(int platFormIndex) {
+        this.platFormIndex = platFormIndex;
+    }
+
+    public static RequestStopPushStreamMsg getInstance(SendRtpItem sendRtpItem, String platformName, int platFormIndex) {
+        RequestStopPushStreamMsg streamMsg = new RequestStopPushStreamMsg();
+        streamMsg.setSendRtpItem(sendRtpItem);
+        streamMsg.setPlatformName(platformName);
+        streamMsg.setPlatFormIndex(platFormIndex);
+        return streamMsg;
+    }
+}

+ 10 - 0
src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java

@@ -6,7 +6,17 @@ package com.genersoft.iot.vmp.service.bean;
 
 public class WvpRedisMsgCmd {
 
+    /**
+     * 请求获取推流信息
+     */
     public static final String GET_SEND_ITEM = "GetSendItem";
+    /**
+     * 请求推流的请求
+     */
     public static final String REQUEST_PUSH_STREAM = "RequestPushStream";
+    /**
+     * 停止推流的请求
+     */
+    public static final String REQUEST_STOP_PUSH_STREAM = "RequestStopPushStream";
 
 }

+ 49 - 1
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java

@@ -133,7 +133,10 @@ public class RedisGbPlayMsgListener implements MessageListener {
                                 case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
                                     RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
                                     requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
-
+                                    break;
+                                case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM:
+                                    RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent());
+                                    requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
                                     break;
                                 default:
                                     break;
@@ -397,6 +400,19 @@ public class RedisGbPlayMsgListener implements MessageListener {
         redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
     }
 
+    /**
+     * 发送请求推流的消息
+     */
+    public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) {
+        String key = UUID.randomUUID().toString();
+        WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
+                WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg));
+
+        JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
+        logger.info("[REDIS 请求其他平台停止推流] {}: {}", serverId, jsonObject);
+        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
+    }
+
     private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
         if (platformGbId == null) {
             platformGbId = "*";
@@ -423,4 +439,36 @@ public class RedisGbPlayMsgListener implements MessageListener {
             return null;
         }
     }
+
+    /**
+     * 处理收到的请求推流的请求
+     */
+    private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) {
+        SendRtpItem sendRtpItem = streamMsg.getSendRtpItem();
+        if (sendRtpItem == null) {
+            logger.info("[REDIS 执行其他平台的请求停止推流] 失败: sendRtpItem为NULL");
+            return;
+        }
+        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+        if (mediaInfo == null) {
+            // TODO 回复错误
+            return;
+        }
+        Map<String, Object> param = new HashMap<>();
+        param.put("vhost","__defaultVhost__");
+        param.put("app",sendRtpItem.getApp());
+        param.put("stream",sendRtpItem.getStream());
+        param.put("ssrc", sendRtpItem.getSsrc());
+
+        if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) {
+            logger.info("[REDIS 执行其他平台的请求停止推流] 成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
+            // 发送redis消息
+            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+                    sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+                    sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+            messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex());
+            redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+        }
+
+    }
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java

@@ -73,7 +73,7 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
         MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
         StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
         if (push != null) {
-            List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
+            List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
                     push.getGbId());
             if (!sendRtpItems.isEmpty()) {
                 for (SendRtpItem sendRtpItem : sendRtpItems) {

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java

@@ -181,7 +181,7 @@ public interface IRedisCatchStorage {
      */
     void sendStreamPushRequestedMsgForStatus();
 
-    List<SendRtpItem> querySendRTPServerByChnnelId(String channelId);
+    List<SendRtpItem> querySendRTPServerByChannelId(String channelId);
 
     List<SendRtpItem> querySendRTPServerByStream(String stream);
 

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -184,7 +184,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
     }
 
     @Override
-    public List<SendRtpItem> querySendRTPServerByChnnelId(String channelId) {
+    public List<SendRtpItem> querySendRTPServerByChannelId(String channelId) {
         if (channelId == null) {
             return null;
         }