Explorar el Código

优化点播结束后关闭RTPServer

648540858 hace 3 años
padre
commit
5b3dc4d595

+ 6 - 0
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java

@@ -99,6 +99,12 @@ public class VideoManagerConstants {
 	 */
 	public static final String VM_MSG_STREAM_PUSH_REQUESTED = "VM_MSG_STREAM_PUSH_REQUESTED";
 
+
+	/**
+	 * redis 消息通知平台通知设备推流结果
+	 */
+	public static final String VM_MSG_STREAM_PUSH_RESPONSE = "VM_MSG_STREAM_PUSH_RESPONSE";
+
 	/**
 	 * redis 消息请求所有的在线通道
 	 */

+ 5 - 3
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java

@@ -12,7 +12,6 @@ import org.springframework.data.redis.connection.RedisConnectionFactory;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.listener.PatternTopic;
 import org.springframework.data.redis.listener.RedisMessageListenerContainer;
-import org.springframework.data.redis.serializer.RedisSerializer;
 import org.springframework.data.redis.serializer.StringRedisSerializer;
 
 import com.genersoft.iot.vmp.utils.redis.FastJsonRedisSerializer;
@@ -43,7 +42,10 @@ public class RedisConfig extends CachingConfigurerSupport {
 	private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
 
 	@Autowired
-	private RedisPushStreamListMsgListener redisPushStreamListMsgListener;
+	private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener;
+
+	@Autowired
+	private RedisPushStreamResponseListener redisPushStreamResponseListener;
 
 	@Bean
 	public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
@@ -81,7 +83,7 @@ public class RedisConfig extends CachingConfigurerSupport {
 		container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
 		container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
 		container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
+		container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
         return container;
     }
-
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

@@ -694,7 +694,7 @@ public class SIPCommander implements ISIPCommander {
 				dialog = streamSession.getDialogByStream(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
 			}
 			mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
-			mediaServerService.closeRTPServer(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
+			mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
 			streamSession.remove(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
 
 			if (dialog == null) {

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java

@@ -121,7 +121,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 					StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
 					if (streamInfo != null) {
 						redisCatchStorage.stopPlay(streamInfo);
-						mediaServerService.closeRTPServer(device.getDeviceId(), channelId, streamInfo.getStream());
+						mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream());
 					}
 					SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
 					if (ssrcTransactionForPlay != null){

+ 19 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -24,6 +24,7 @@ import com.genersoft.iot.vmp.service.IStreamPushService;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
+import com.genersoft.iot.vmp.service.impl.RedisPushStreamResponseListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -74,7 +75,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
     private DynamicTask dynamicTask;
 
     @Autowired
-    private SIPCommander cmder;
+    private RedisPushStreamResponseListener redisPushStreamResponseListener;
 
     @Autowired
     private IPlayService playService;
@@ -556,7 +557,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
             otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                     mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
         }
-
     }
     /**
      * 通知流上线
@@ -639,6 +639,23 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                             mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                 }
             });
+
+            // 添加回复的拒绝或者错误的通知
+            redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
+                if (response.getCode() != 0) {
+                    dynamicTask.stop(callIdHeader.getCallId());
+                    mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
+                    try {
+                        responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
+                    } catch (SipException e) {
+                        throw new RuntimeException(e);
+                    } catch (InvalidArgumentException e) {
+                        throw new RuntimeException(e);
+                    } catch (ParseException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
         }
     }
 

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java

@@ -79,8 +79,8 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem
             List<DeviceChannel> allChannels = new ArrayList<>();
 
             // 回复平台
-            DeviceChannel deviceChannel = getChannelForPlatform(parentPlatform);
-            allChannels.add(deviceChannel);
+//            DeviceChannel deviceChannel = getChannelForPlatform(parentPlatform);
+//            allChannels.add(deviceChannel);
 
             // 回复目录
             if (catalogs.size() > 0) {

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java

@@ -139,6 +139,7 @@ public class ZLMRTPServerFactory {
             param.put("stream_id", streamId);
             JSONObject jsonObject = zlmresTfulUtils.closeRtpServer(serverItem, param);
             if (jsonObject != null ) {
+                System.out.println(jsonObject);
                 if (jsonObject.getInteger("code") == 0) {
                     result = jsonObject.getInteger("hit") == 1;
                 }else {

+ 3 - 1
src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java

@@ -50,7 +50,9 @@ public interface IMediaServerService {
 
     SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback, Integer port);
 
-    void closeRTPServer(String deviceId, String channelId, String ssrc);
+    void closeRTPServer(MediaServerItem mediaServerItem, String streamId);
+
+    void closeRTPServer(String mediaServerId, String streamId);
 
     void clearRTPServer(MediaServerItem mediaServerItem);
 

+ 2 - 0
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java

@@ -48,6 +48,8 @@ public class MessageForPushChannel {
      */
     private String mediaServerId;
 
+
+
     public static MessageForPushChannel getInstance(int type, String app, String stream, String gbId,
                                                     String platFormId, String platFormName, String serverId,
                                                     String mediaServerId){

+ 71 - 0
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannelResponse.java

@@ -0,0 +1,71 @@
+package com.genersoft.iot.vmp.service.bean;
+
+/**
+ * 当redis回复推流结果上级平台
+ * @author lin
+ */
+public class MessageForPushChannelResponse {
+    /**
+     * 错误玛
+     * 0 成功 1 失败
+     */
+    private int code;
+    /**
+     * 错误内容
+     */
+    private String msg;
+
+    /**
+     * 流应用名
+     */
+    private String app;
+
+    /**
+     * 流Id
+     */
+    private String stream;
+
+
+
+    public static MessageForPushChannelResponse getInstance(int code, String msg, String app, String stream){
+        MessageForPushChannelResponse messageForPushChannel = new MessageForPushChannelResponse();
+        messageForPushChannel.setCode(code);
+        messageForPushChannel.setMsg(msg);
+        messageForPushChannel.setApp(app);
+        messageForPushChannel.setStream(stream);
+        return messageForPushChannel;
+    }
+
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    public String getApp() {
+        return app;
+    }
+
+    public void setApp(String app) {
+        this.app = app;
+    }
+
+    public String getStream() {
+        return stream;
+    }
+
+    public void setStream(String stream) {
+        this.stream = stream;
+    }
+
+    public String getMsg() {
+        return msg;
+    }
+
+    public void setMsg(String msg) {
+        this.msg = msg;
+    }
+}

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java

@@ -145,7 +145,7 @@ public class DeviceServiceImpl implements IDeviceService {
         if (ssrcTransactions != null && ssrcTransactions.size() > 0) {
             for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
                 mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
-                mediaServerService.closeRTPServer(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
+                mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
                 streamSession.remove(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
             }
         }

+ 11 - 9
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java

@@ -164,16 +164,18 @@ public class MediaServerServiceImpl implements IMediaServerService {
     }
 
     @Override
-    public void closeRTPServer(String deviceId, String channelId, String stream) {
-        String mediaServerId = streamSession.getMediaServerId(deviceId, channelId, stream);
-        String ssrc = streamSession.getSSRC(deviceId, channelId, stream);
-        MediaServerItem mediaServerItem = this.getOne(mediaServerId);
-        if (mediaServerItem != null) {
-            String streamId = String.format("%s_%s", deviceId, channelId);
-            zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
-            releaseSsrc(mediaServerItem.getId(), ssrc);
+    public void closeRTPServer(MediaServerItem mediaServerItem, String streamId) {
+        if (mediaServerItem == null) {
+            return;
         }
-        streamSession.remove(deviceId, channelId, stream);
+        zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
+        releaseSsrc(mediaServerItem.getId(), streamId);
+    }
+
+    @Override
+    public void closeRTPServer(String mediaServerId, String streamId) {
+        MediaServerItem mediaServerItem = this.getOne(mediaServerId);
+        closeRTPServer(mediaServerItem, streamId);
     }
 
     @Override

+ 6 - 6
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java

@@ -270,7 +270,7 @@ public class PlayServiceImpl implements IPlayService {
                 logger.info("[点播超时] 消息未响应 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
                 timeoutCallback.run(0, "点播超时");
                 mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
-                mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+                mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
                 streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
             }
         }, userSetting.getPlayTimeout());
@@ -333,7 +333,7 @@ public class PlayServiceImpl implements IPlayService {
                                 });
                     }
                     // 关闭rtp server
-                    mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+                    mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
                     // 重新开启ssrc server
                     mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, finalSsrcInfo.getPort());
 
@@ -341,7 +341,7 @@ public class PlayServiceImpl implements IPlayService {
             }
         }, (event) -> {
             dynamicTask.stop(timeOutTaskKey);
-            mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+            mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
             // 释放ssrc
             mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
 
@@ -445,7 +445,7 @@ public class PlayServiceImpl implements IPlayService {
                 cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
             }else {
                 mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
-                mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
+                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                 streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
             }
             cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
@@ -533,7 +533,7 @@ public class PlayServiceImpl implements IPlayService {
                                     });
                                 }
                                 // 关闭rtp server
-                                mediaServerService.closeRTPServer(device.getDeviceId(), channelId, ssrcInfo.getStream());
+                                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                                 // 重新开启ssrc server
                                 mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort());
                             }
@@ -593,7 +593,7 @@ public class PlayServiceImpl implements IPlayService {
                 cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
             }else {
                 mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
-                mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
+                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                 streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
             }
             cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);

+ 62 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java

@@ -0,0 +1,62 @@
+package com.genersoft.iot.vmp.service.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.service.IGbStreamService;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import javax.annotation.Resource;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 接收redis返回的推流结果
+ * @author lin
+ */
+@Component
+public class RedisPushStreamResponseListener implements MessageListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
+
+    private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
+
+    public interface PushStreamResponseEvent{
+        void run(MessageForPushChannelResponse response);
+    }
+
+    @Override
+    public void onMessage(Message message, byte[] bytes) {
+        //
+        logger.warn("[REDIS消息-请求推流结果]: {}", new String(message.getBody()));
+        MessageForPushChannelResponse response = JSON.parseObject(new String(message.getBody()), MessageForPushChannelResponse.class);
+        if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
+            logger.info("[REDIS消息-请求推流结果]:参数不全");
+            return;
+        }
+        // 查看正在等待的invite消息
+        if (responseEvents.get(response.getApp() + response.getStream()) != null) {
+            responseEvents.get(response.getApp() + response.getStream()).run(response);
+        }
+    }
+
+    public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
+        responseEvents.put(app + stream, callback);
+    }
+
+    public void removeEvent(String app, String stream) {
+        responseEvents.remove(app + stream);
+    }
+}

+ 2 - 3
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java → src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java

@@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.service.IStreamPushService;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.redis.connection.Message;
 import org.springframework.data.redis.connection.MessageListener;
 import org.springframework.stereotype.Component;
@@ -23,9 +22,9 @@ import java.util.*;
  * @Description: 接收redis发送的推流设备列表更新通知
  */
 @Component
-public class RedisPushStreamListMsgListener implements MessageListener {
+public class RedisPushStreamStatusListMsgListener implements MessageListener {
 
-    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamListMsgListener.class);
+    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusListMsgListener.class);
     @Resource
     private IMediaServerService mediaServerService;