소스 검색

增加上级推流和停止推流的通知

648540858 2 년 전
부모
커밋
def56793ba

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java

@@ -63,6 +63,7 @@ public class RedisMsgListenConfig {
 		container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
 		container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
 		container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
+//		container.addMessageListener(, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
         return container;
     }
 }

+ 13 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java

@@ -3,6 +3,8 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.conf.DynamicTask;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@@ -14,6 +16,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
 import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -57,6 +60,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 	@Autowired
     private IRedisCatchStorage redisCatchStorage;
 
+	@Autowired
+    private UserSetting userSetting;
+
 	@Autowired
 	private IVideoManagerStorage storager;
 
@@ -155,6 +161,13 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 		} else if (jsonObject.getInteger("code") == 0) {
 			logger.info("调用ZLM推流接口, 结果: {}",  jsonObject);
 			logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
+			if (sendRtpItem.getPlayType() == InviteStreamType.PUSH) {
+				MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStreamId(),
+						sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(),
+						sendRtpItem.getMediaServerId());
+				messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
+				redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
+			}
 		} else {
 			logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param));
 			if (sendRtpItem.isOnlyAudio()) {

+ 31 - 11
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java

@@ -1,11 +1,9 @@
 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 
 import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
-import com.genersoft.iot.vmp.gb28181.bean.Device;
-import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
+import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
@@ -15,6 +13,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IDeviceService;
 import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IPlatformService;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -25,7 +24,9 @@ import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import javax.sip.*;
+import javax.sip.InvalidArgumentException;
+import javax.sip.RequestEvent;
+import javax.sip.SipException;
 import javax.sip.address.SipURI;
 import javax.sip.header.CallIdHeader;
 import javax.sip.header.FromHeader;
@@ -51,6 +52,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 	@Autowired
 	private IRedisCatchStorage redisCatchStorage;
 
+	@Autowired
+	private IPlatformService platformService;
+
 	@Autowired
 	private IDeviceService deviceService;
 
@@ -69,6 +73,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 	@Autowired
 	private VideoStreamSessionManager streamSession;
 
+	@Autowired
+	private UserSetting userSetting;
+
 	@Override
 	public void afterPropertiesSet() throws Exception {
 		// 添加消息处理的订阅
@@ -103,6 +110,19 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 				MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
 				redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null);
 				zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
+				if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
+					ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
+					if (platform != null) {
+						MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+								sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
+								sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+						messageForPushChannel.setPlatFormIndex(platform.getId());
+						redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+					}else {
+						logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId());
+					}
+				}
+
 				int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
 				if (totalReaderCount <= 0) {
 					logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
@@ -119,12 +139,12 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 							logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
 						}
 					}
-					if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
-						MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
-								sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
-								sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId());
-						redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
-					}
+//					if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
+//						MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+//								sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
+//								sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId());
+//						redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
+//					}
 				}
 			}
 			// 可能是设备主动停止

+ 8 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -19,6 +19,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
 import com.genersoft.iot.vmp.service.*;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;
@@ -463,6 +464,13 @@ public class ZLMHttpHookListener {
                             }
                             redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
                                     sendRtpItem.getCallId(), sendRtpItem.getStreamId());
+                            if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
+                                MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+                                        sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
+                                        sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+                                messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
+                                redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+                            }
                         }
                     }
                 }

+ 3 - 3
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java

@@ -34,7 +34,7 @@ public class MessageForPushChannel {
     /**
      * 请求的平台自增ID
      */
-    private String platFormIndex;
+    private int platFormIndex;
 
     /**
      * 请求平台名称
@@ -132,11 +132,11 @@ public class MessageForPushChannel {
         this.mediaServerId = mediaServerId;
     }
 
-    public String getPlatFormIndex() {
+    public int getPlatFormIndex() {
         return platFormIndex;
     }
 
-    public void setPlatFormIndex(String platFormIndex) {
+    public void setPlatFormIndex(int platFormIndex) {
         this.platFormIndex = platFormIndex;
     }
 }

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java

@@ -129,6 +129,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
                                 case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
                                     RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
                                     requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
+
                                     break;
                                 default:
                                     break;

+ 76 - 0
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java

@@ -0,0 +1,76 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * 接收redis发送的结束推流请求
+ * @author lin
+ */
+@Component
+public class RedisPushStreamCloseResponseListener implements MessageListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class);
+
+    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+
+    private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
+
+    public interface PushStreamResponseEvent{
+        void run(MessageForPushChannelResponse response);
+    }
+
+    @Override
+    public void onMessage(Message message, byte[] bytes) {
+        logger.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody()));
+        boolean isEmpty = taskQueue.isEmpty();
+        taskQueue.offer(message);
+        if (isEmpty) {
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    try {
+                        MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
+                        if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
+                            logger.info("[REDIS消息-请求推流结果]:参数不全");
+                            continue;
+                        }
+                        // 查看正在等待的invite消息
+                        if (responseEvents.get(response.getApp() + response.getStream()) != null) {
+                            responseEvents.get(response.getApp() + response.getStream()).run(response);
+                        }
+                    }catch (Exception e) {
+                        logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
+                        logger.error("[REDIS消息-请求推流结果] 异常内容: ", e);
+                    }
+                }
+            });
+        }
+    }
+
+    public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
+        responseEvents.put(app + stream, callback);
+    }
+
+    public void removeEvent(String app, String stream) {
+        responseEvents.remove(app + stream);
+    }
+}

+ 4 - 0
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java

@@ -263,4 +263,8 @@ public interface IRedisCatchStorage {
     void removeAllDevice();
 
     void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online);
+
+    void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel);
+
+    void sendPlatformStopPlayMsg(MessageForPushChannel messageForPushChannel);
 }

+ 14 - 0
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -925,4 +925,18 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
         // 使用 RedisTemplate<Object, Object> 发送字符串消息会导致发送的消息多带了双引号
         stringRedisTemplate.convertAndSend(key, msg.toString());
     }
+
+    @Override
+    public void sendPlatformStartPlayMsg(MessageForPushChannel msg) {
+        String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY;
+        logger.info("[redis发送通知] 推流被上级平台观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
+        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
+    }
+
+    @Override
+    public void sendPlatformStopPlayMsg(MessageForPushChannel msg) {
+        String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
+        logger.info("[redis发送通知] 上级平台停止观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
+        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
+    }
 }