648540858 hace 1 año
padre
commit
94436de3e1

+ 2 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -780,7 +780,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
             }
         }, userSetting.getPlatformPlayTimeout());
         //
-        redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> {
+        long key = redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> {
             dynamicTask.stop(sendRtpItem.getCallId());
             if (sendRtpItemKey == null) {
                 logger.warn("[级联点播] 等待推流得到结果未空: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
@@ -835,6 +835,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
             if (response.getCode() != 0) {
                 dynamicTask.stop(sendRtpItem.getCallId());
                 redisRpcService.stopWaitePushStreamOnline(sendRtpItem.getRedisKey(), sendRtpItem);
+                redisRpcService.removeCallback(key);
                 try {
                     responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
                 } catch (SipException | InvalidArgumentException | ParseException e) {

+ 2 - 1
src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java

@@ -12,10 +12,11 @@ public interface IRedisRpcService {
 
     WVPResult stopSendRtp(String sendRtpItemKey);
 
-    void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback);
+    long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback);
 
     void stopWaitePushStreamOnline(String sendRtpItemKey, SendRtpItem sendRtpItem);
 
     void rtpSendStopped(String sendRtpItemKey);
 
+    void removeCallback(long key);
 }

+ 7 - 2
src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java

@@ -84,7 +84,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
     }
 
     @Override
-    public void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback) {
+    public long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback) {
         logger.info("[请求所有WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
         // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
         HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
@@ -122,7 +122,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
             }
             hookSubscribe.removeSubscribe(hook);
         });
-
+        return request.getSn();
     }
 
     @Override
@@ -147,4 +147,9 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
         request.setToId(sendRtpItem.getServerId());
         redisRpcConfig.request(request, 10);
     }
+
+    @Override
+    public void removeCallback(long key) {
+        redisRpcConfig.removeCallback(key);
+    }
 }