소스 검색

临时提交

648540858 1 년 전
부모
커밋
2e90344c89
16개의 변경된 파일271개의 추가작업 그리고 265개의 파일을 삭제
  1. 13 7
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java
  2. 5 7
      src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java
  3. 27 26
      src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java
  4. 102 156
      src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java
  5. 3 4
      src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java
  6. 6 7
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
  7. 0 2
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
  8. 1 1
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java
  9. 3 3
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java
  10. 16 8
      src/main/java/com/genersoft/iot/vmp/service/ISendRtpServerService.java
  11. 10 7
      src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
  12. 65 17
      src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java
  13. 1 1
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
  14. 13 9
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
  15. 3 3
      src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
  16. 3 7
      src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java

+ 13 - 7
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java

@@ -2,8 +2,6 @@ package com.genersoft.iot.vmp.gb28181.bean;
 
 import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
-
-import com.genersoft.iot.vmp.common.VideoManagerConstants;
 import lombok.Data;
 
 @Data
@@ -34,6 +32,11 @@ public class SendRtpInfo {
      */
     private String targetName;
 
+    /**
+     * 是否是发送给上级平台
+     */
+    private boolean sendToPlatform;
+
     /**
      * 直播流的应用名
      */
@@ -182,8 +185,13 @@ public class SendRtpInfo {
         sendRtpItem.setIp(ip);
         sendRtpItem.setPort(port);
         sendRtpItem.setSsrc(ssrc);
-        sendRtpItem.setDeviceId(deviceId);
-        sendRtpItem.setPlatformId(platformId);
+        if (deviceId != null) {
+            sendRtpItem.setTargetId(deviceId);
+            sendRtpItem.setSendToPlatform(false);
+        }else {
+            sendRtpItem.setTargetId(platformId);
+            sendRtpItem.setSendToPlatform(true);
+        }
         sendRtpItem.setChannelId(channelId);
         sendRtpItem.setTcp(isTcp);
         sendRtpItem.setRtcp(rtcp);
@@ -200,9 +208,7 @@ public class SendRtpInfo {
                 "ip='" + ip + '\'' +
                 ", port=" + port +
                 ", ssrc='" + ssrc + '\'' +
-                ", platformId='" + platformId + '\'' +
-                ", platformName='" + platformName + '\'' +
-                ", deviceId='" + deviceId + '\'' +
+                ", targetId='" + targetId + '\'' +
                 ", app='" + app + '\'' +
                 ", channelId='" + channelId + '\'' +
                 ", status=" + status +

+ 5 - 7
src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java

@@ -10,6 +10,9 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
 import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
 import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
+import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
+import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
+import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
 import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
 import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
@@ -20,9 +23,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
-import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
-import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
 import com.genersoft.iot.vmp.service.ISendRtpServerService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -32,9 +32,7 @@ import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.jdbc.datasource.DataSourceTransactionManager;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.TransactionDefinition;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.ObjectUtils;
 
@@ -234,10 +232,10 @@ public class DeviceServiceImpl implements IDeviceService {
         removeMobilePositionSubscribe(device, null);
 
         List<AudioBroadcastCatch> audioBroadcastCatches = audioBroadcastManager.getByDeviceId(deviceId);
-        if (audioBroadcastCatches.size() > 0) {
+        if (!audioBroadcastCatches.isEmpty()) {
             for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatches) {
 
-                SendRtpInfo sendRtpItem = sendRtpServerService.queryByChannelId(audioBroadcastCatch.getChannelId());
+                SendRtpInfo sendRtpItem = sendRtpServerService.queryByChannelId(audioBroadcastCatch.getChannelId(), deviceId);
                 if (sendRtpItem != null) {
                     sendRtpServerService.delete(sendRtpItem);
                     MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());

+ 27 - 26
src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java

@@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
 import com.genersoft.iot.vmp.gb28181.dao.PlatformMapper;
-import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
 import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
 import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
@@ -111,20 +110,21 @@ public class PlatformServiceImpl implements IPlatformService {
     @Async("taskExecutor")
     @EventListener
     public void onApplicationEvent(MediaDepartureEvent event) {
-        SendRtpInfo sendRtpItems = sendRtpServerService.queryByStream(event.getStream());
-        if (sendRtpItems != null) {
-            if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) {
-                String platformId = sendRtpItem.getPlatformId();
-                Platform platform = platformMapper.getParentPlatByServerGBId(platformId);
-                CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId());
-                try {
-                    if (platform != null && channel != null) {
-                        commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel);
-                        redisCatchStorage.deleteSendRTPServer(platformId, channel.getGbDeviceId(),
-                                sendRtpItem.getCallId(), sendRtpItem.getStream());
+        List<SendRtpInfo> sendRtpItems = sendRtpServerService.queryByStream(event.getStream());
+        if (!sendRtpItems.isEmpty()) {
+            for (SendRtpInfo sendRtpItem : sendRtpItems) {
+                if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp()) && sendRtpItem.isSendToPlatform()) {
+                    String platformId = sendRtpItem.getTargetId();
+                    Platform platform = platformMapper.getParentPlatByServerGBId(platformId);
+                    CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId());
+                    try {
+                        if (platform != null && channel != null) {
+                            commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel);
+                            sendRtpServerService.delete(sendRtpItem);
+                        }
+                    } catch (SipException | InvalidArgumentException | ParseException e) {
+                        log.error("[命令发送失败] 发送BYE: {}", e.getMessage());
                     }
-                } catch (SipException | InvalidArgumentException | ParseException e) {
-                    log.error("[命令发送失败] 发送BYE: {}", e.getMessage());
                 }
             }
         }
@@ -137,19 +137,20 @@ public class PlatformServiceImpl implements IPlatformService {
     @Async("taskExecutor")
     @EventListener
     public void onApplicationEvent(MediaSendRtpStoppedEvent event) {
-        List<SendRtpInfo> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
+        List<SendRtpInfo> sendRtpItems = sendRtpServerService.queryByStream(event.getStream());
         if (sendRtpItems != null && !sendRtpItems.isEmpty()) {
             for (SendRtpInfo sendRtpItem : sendRtpItems) {
-                Platform platform = platformMapper.getParentPlatByServerGBId(sendRtpItem.getPlatformId());
-                CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId());
-                ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
-                try {
-                    commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel);
-                } catch (SipException | InvalidArgumentException | ParseException e) {
-                    log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
+                if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp()) && sendRtpItem.isSendToPlatform()) {
+                    Platform platform = platformMapper.getParentPlatByServerGBId(sendRtpItem.getTargetId());
+                    CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId());
+                    ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
+                    try {
+                        commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel);
+                    } catch (SipException | InvalidArgumentException | ParseException e) {
+                        log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
+                    }
+                    sendRtpServerService.delete(sendRtpItem);
                 }
-                redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), sendRtpItem.getChannelId(),
-                        sendRtpItem.getCallId(), sendRtpItem.getStream());
             }
         }
     }
@@ -419,11 +420,11 @@ public class PlatformServiceImpl implements IPlatformService {
     }
 
     private void stopAllPush(String platformId) {
-        List<SendRtpInfo> sendRtpItems = redisCatchStorage.querySendRTPServer(platformId);
+        List<SendRtpInfo> sendRtpItems = sendRtpServerService.queryForPlatform(platformId);
         if (sendRtpItems != null && sendRtpItems.size() > 0) {
             for (SendRtpInfo sendRtpItem : sendRtpItems) {
                 ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
-                redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null);
+                sendRtpServerService.delete(sendRtpItem);
                 MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                 mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
             }

+ 102 - 156
src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java

@@ -29,6 +29,7 @@ import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
 import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
+import com.genersoft.iot.vmp.service.ISendRtpServerService;
 import com.genersoft.iot.vmp.service.bean.DownloadFileInfo;
 import com.genersoft.iot.vmp.service.bean.ErrorCallback;
 import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
@@ -126,6 +127,9 @@ public class PlayServiceImpl implements IPlayService {
     @Autowired
     private IGbChannelService channelService;
 
+    @Autowired
+    private ISendRtpServerService sendRtpServerService;
+
     /**
      * 流到来的处理
      */
@@ -180,23 +184,23 @@ public class PlayServiceImpl implements IPlayService {
     @Async("taskExecutor")
     @EventListener
     public void onApplicationEvent(MediaDepartureEvent event) {
-        List<SendRtpInfo> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
-        if (!sendRtpItems.isEmpty()) {
-            for (SendRtpInfo sendRtpItem : sendRtpItems) {
-                if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) {
-                    String platformId = sendRtpItem.getPlatformId();
+        List<SendRtpInfo> sendRtpInfos = sendRtpServerService.queryByStream(event.getStream());
+        if (!sendRtpInfos.isEmpty()) {
+            for (SendRtpInfo sendRtpInfo : sendRtpInfos) {
+                if (sendRtpInfo != null && sendRtpInfo.isSendToPlatform() && sendRtpInfo.getApp().equals(event.getApp())) {
+                    String platformId = sendRtpInfo.getTargetId();
                     Device device = deviceService.getDeviceByDeviceId(platformId);
-                    DeviceChannel channel = deviceChannelService.getOneById(sendRtpItem.getChannelId());
+                    DeviceChannel channel = deviceChannelService.getOneById(sendRtpInfo.getChannelId());
                     try {
                         if (device != null && channel != null) {
-                            cmder.streamByeCmd(device, channel.getDeviceId(), event.getStream(), sendRtpItem.getCallId());
-                            if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
-                                    || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
+                            cmder.streamByeCmd(device, channel.getDeviceId(), event.getStream(), sendRtpInfo.getCallId());
+                            if (sendRtpInfo.getPlayType().equals(InviteStreamType.BROADCAST)
+                                    || sendRtpInfo.getPlayType().equals(InviteStreamType.TALK)) {
                                 AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(channel.getId());
                                 if (audioBroadcastCatch != null) {
                                     // 来自上级平台的停止对讲
-                                    log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-                                    audioBroadcastManager.del(sendRtpItem.getChannelId());
+                                    log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpInfo.getTargetId(), sendRtpInfo.getChannelId());
+                                    audioBroadcastManager.del(sendRtpInfo.getChannelId());
                                 }
                             }
                         }
@@ -362,57 +366,47 @@ public class PlayServiceImpl implements IPlayService {
             audioEvent.call("ssrc已经用尽");
             return;
         }
-        SendRtpInfo sendRtpItem = new SendRtpInfo();
-        sendRtpItem.setApp("talk");
-        sendRtpItem.setStream(stream);
-        sendRtpItem.setSsrc(playSsrc);
-        sendRtpItem.setDeviceId(device.getDeviceId());
-        sendRtpItem.setPlatformId(device.getDeviceId());
-        sendRtpItem.setChannelId(channel.getId());
-        sendRtpItem.setRtcp(false);
-        sendRtpItem.setMediaServerId(mediaServerItem.getId());
-        sendRtpItem.setOnlyAudio(true);
-        sendRtpItem.setPlayType(InviteStreamType.TALK);
-        sendRtpItem.setPt(8);
-        sendRtpItem.setStatus(1);
-        sendRtpItem.setTcpActive(false);
-        sendRtpItem.setTcp(true);
-        sendRtpItem.setUsePs(false);
-        sendRtpItem.setReceiveStream(stream + "_talk");
-
-        String callId = SipUtils.getNewCallId();
-        int port = sendRtpPortManager.getNextPort(mediaServerItem);
-        //端口获取失败的ssrcInfo 没有必要发送点播指令
-        if (port <= 0) {
-            log.info("[语音对讲] 端口分配异常,deviceId={},channelId={}", device.getDeviceId(), channel.getDeviceId());
-            audioEvent.call("端口分配异常");
+        SendRtpInfo sendRtpInfo;
+        try {
+            sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServerItem, null, null, playSsrc, device.getDeviceId(), "talk", stream,
+                    channel.getId(), true, false);
+        }catch (PlayException e) {
+            log.info("[语音对讲]开始 获取发流端口失败 deviceId: {}, channelId: {},", device.getDeviceId(), channel.getDeviceId());
             return;
         }
-        sendRtpItem.setLocalPort(port);
-        sendRtpItem.setPort(port);
-        log.info("[语音对讲]开始 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channel.getDeviceId(), sendRtpItem.getLocalPort(), device.getStreamMode(), sendRtpItem.getSsrc(), false);
+       
+        
+        sendRtpInfo.setOnlyAudio(true);
+        sendRtpInfo.setPt(8);
+        sendRtpInfo.setStatus(1);
+        sendRtpInfo.setTcpActive(false);
+        sendRtpInfo.setUsePs(false);
+        sendRtpInfo.setReceiveStream(stream + "_talk");
+
+        String callId = SipUtils.getNewCallId();
+        log.info("[语音对讲]开始 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channel.getDeviceId(), sendRtpInfo.getLocalPort(), device.getStreamMode(), sendRtpInfo.getSsrc(), false);
         // 超时处理
         String timeOutTaskKey = UUID.randomUUID().toString();
         dynamicTask.startDelay(timeOutTaskKey, () -> {
 
-            log.info("[语音对讲] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channel.getDeviceId(), sendRtpItem.getPort(), sendRtpItem.getSsrc());
+            log.info("[语音对讲] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channel.getDeviceId(), sendRtpInfo.getPort(), sendRtpInfo.getSsrc());
             timeoutCallback.run();
             // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
             try {
-                cmder.streamByeCmd(device, channel.getDeviceId(), sendRtpItem.getStream(), null);
+                cmder.streamByeCmd(device, channel.getDeviceId(), stream, null);
             } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
                 log.error("[语音对讲]超时, 发送BYE失败 {}", e.getMessage());
             } finally {
                 timeoutCallback.run();
-                mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
-                sessionManager.removeByStream(sendRtpItem.getStream());
+                mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc());
+                sessionManager.removeByStream(sendRtpInfo.getStream());
             }
         }, userSetting.getPlayTimeout());
 
         try {
-            mediaServerService.startSendRtpPassive(mediaServerItem, sendRtpItem, userSetting.getPlayTimeout() * 1000);
+            mediaServerService.startSendRtpPassive(mediaServerItem, sendRtpInfo, userSetting.getPlayTimeout() * 1000);
         }catch (ControllerException e) {
-            mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
+            mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc());
             log.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channel.getDeviceId());
             audioEvent.call("失败, " + e.getMessage());
             // 查看是否已经建立了通道,存在则发送bye
@@ -422,7 +416,7 @@ public class PlayServiceImpl implements IPlayService {
 
         // 查看设备是否已经在推流
         try {
-            cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channel, callId, (hookData) -> {
+            cmder.talkStreamCmd(mediaServerItem, sendRtpInfo, device, channel, callId, (hookData) -> {
                 log.info("[语音对讲] 流已生成, 开始推流: " + hookData);
                 dynamicTask.stop(timeOutTaskKey);
                 // TODO 暂不做处理
@@ -437,13 +431,13 @@ public class PlayServiceImpl implements IPlayService {
                     ResponseEvent responseEvent = (ResponseEvent) event.event;
                     if (responseEvent.getResponse() instanceof SIPResponse) {
                         SIPResponse response = (SIPResponse) responseEvent.getResponse();
-                        sendRtpItem.setFromTag(response.getFromTag());
-                        sendRtpItem.setToTag(response.getToTag());
-                        sendRtpItem.setCallId(response.getCallIdHeader().getCallId());
-                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
+                        sendRtpInfo.setFromTag(response.getFromTag());
+                        sendRtpInfo.setToTag(response.getToTag());
+                        sendRtpInfo.setCallId(response.getCallIdHeader().getCallId());
+                        sendRtpServerService.update(sendRtpInfo);
 
-                        SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), sendRtpItem.getChannelId(), "talk",
-                                sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(),
+                        SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), sendRtpInfo.getChannelId(), "talk",
+                                sendRtpInfo.getStream(), sendRtpInfo.getSsrc(), sendRtpInfo.getMediaServerId(),
                                 response, InviteSessionType.TALK);
 
                         sessionManager.put(ssrcTransaction);
@@ -456,21 +450,21 @@ public class PlayServiceImpl implements IPlayService {
 
             }, (event) -> {
                 dynamicTask.stop(timeOutTaskKey);
-                mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream());
+                mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getStream());
                 // 释放ssrc
-                mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
-                sessionManager.removeByStream(sendRtpItem.getStream());
+                mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc());
+                sessionManager.removeByStream(sendRtpInfo.getStream());
                 errorEvent.response(event);
             });
         } catch (InvalidArgumentException | SipException | ParseException e) {
 
             log.error("[命令发送失败] 对讲消息: {}", e.getMessage());
             dynamicTask.stop(timeOutTaskKey);
-            mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream());
+            mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getStream());
             // 释放ssrc
-            mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
+            mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc());
 
-            sessionManager.removeByStream(sendRtpItem.getStream());
+            sessionManager.removeByStream(sendRtpInfo.getStream());
             SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
             eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
             eventResult.statusCode = -1;
@@ -1138,14 +1132,14 @@ public class PlayServiceImpl implements IPlayService {
     @Override
     public void zlmServerOffline(String mediaServerId) {
         // 处理正在向上推流的上级平台
-        List<SendRtpInfo> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
-        if (sendRtpItems.size() > 0) {
-            for (SendRtpInfo sendRtpItem : sendRtpItems) {
-                if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
-                    Platform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
-                    CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId());
+        List<SendRtpInfo> sendRtpInfos = sendRtpServerService.queryAll();
+        if (!sendRtpInfos.isEmpty()) {
+            for (SendRtpInfo sendRtpInfo : sendRtpInfos) {
+                if (sendRtpInfo.getMediaServerId().equals(mediaServerId) && sendRtpInfo.isSendToPlatform()) {
+                    Platform platform = platformService.queryPlatformByServerGBId(sendRtpInfo.getTargetId());
+                    CommonGBChannel channel = channelService.getOne(sendRtpInfo.getChannelId());
                     try {
-                        sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem, channel);
+                        sipCommanderFroPlatform.streamByeCmd(platform, sendRtpInfo, channel);
                     } catch (SipException | InvalidArgumentException | ParseException e) {
                         log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                     }
@@ -1216,10 +1210,10 @@ public class PlayServiceImpl implements IPlayService {
         }
         // 查询通道使用状态
         if (audioBroadcastManager.exit(deviceChannel.getId())) {
-            SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channel.getDeviceId(), null, null);
-            if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
+            SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(channel.getId(), device.getDeviceId());
+            if (sendRtpInfo != null && sendRtpInfo.isOnlyAudio()) {
                 // 查询流是否存在,不存在则认为是异常状态
-                Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
+                Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream());
                 if (streamReady) {
                     log.warn("语音广播已经开启: {}", channel.getDeviceId());
                     event.call("语音广播已经开启");
@@ -1256,11 +1250,11 @@ public class PlayServiceImpl implements IPlayService {
     @Override
     public boolean audioBroadcastInUse(Device device, DeviceChannel channel) {
         if (audioBroadcastManager.exit(channel.getId())) {
-            SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channel.getDeviceId(), null, null);
-            if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
+            SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(channel.getId(), device.getDeviceId());
+            if (sendRtpInfo != null && sendRtpInfo.isOnlyAudio()) {
                 // 查询流是否存在,不存在则认为是异常状态
-                MediaServer mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-                Boolean streamReady = mediaServerService.isStreamReady(mediaServerServiceOne, sendRtpItem.getApp(), sendRtpItem.getStream());
+                MediaServer mediaServerServiceOne = mediaServerService.getOne(sendRtpInfo.getMediaServerId());
+                Boolean streamReady = mediaServerService.isStreamReady(mediaServerServiceOne, sendRtpInfo.getApp(), sendRtpInfo.getStream());
                 if (streamReady) {
                     log.warn("语音广播通道使用中: {}", channel.getDeviceId());
                     return true;
@@ -1285,11 +1279,11 @@ public class PlayServiceImpl implements IPlayService {
                 if (audioBroadcastCatch == null) {
                     continue;
                 }
-                SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channel.getDeviceId(), null, null);
-                if (sendRtpItem != null) {
-                    redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channel.getDeviceId(), null, null);
-                    MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-                    mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
+                SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(channel.getId(), device.getDeviceId());
+                if (sendRtpInfo != null) {
+                    sendRtpServerService.delete(sendRtpInfo);
+                    MediaServer mediaServer = mediaServerService.getOne(sendRtpInfo.getMediaServerId());
+                    mediaServerService.stopSendRtp(mediaServer, sendRtpInfo.getApp(), sendRtpInfo.getStream(), null);
                     try {
                         cmder.streamByeCmdForDeviceInvite(device, channel.getDeviceId(), audioBroadcastCatch.getSipTransactionInfo(), null);
                     } catch (InvalidArgumentException | ParseException | SipException |
@@ -1305,54 +1299,6 @@ public class PlayServiceImpl implements IPlayService {
 
     @Override
     public void zlmServerOnline(String mediaServerId) {
-        // TODO 查找之前的点播,流如果不存在则给下级发送bye
-//        MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
-//        zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
-//            Integer code = mediaList.getInteger("code");
-//            if (code == 0) {
-//                JSONArray data = mediaList.getJSONArray("data");
-//                if (data == null || data.size() == 0) {
-//                    zlmServerOffline(mediaServerId);
-//                }else {
-//                    Map<String, JSONObject> mediaListMap = new HashMap<>();
-//                    for (int i = 0; i < data.size(); i++) {
-//                        JSONObject json = data.getJSONObject(i);
-//                        String app = json.getString("app");
-//                        if ("rtp".equals(app)) {
-//                            String stream = json.getString("stream");
-//                            if (mediaListMap.get(stream) != null) {
-//                                continue;
-//                            }
-//                            mediaListMap.put(stream, json);
-//                            // 处理正在观看的国标设备
-//                            List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(null, null, null, stream);
-//                            if (ssrcTransactions.size() > 0) {
-//                                for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
-//                                    if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
-//                                        cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(),
-//                                                ssrcTransaction.getStream(), null);
-//                                    }
-//                                }
-//                            }
-//                        }
-//                    }
-//                    if (mediaListMap.size() > 0 ) {
-//                        // 处理正在向上推流的上级平台
-//                        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
-//                        if (sendRtpItems.size() > 0) {
-//                            for (SendRtpItem sendRtpItem : sendRtpItems) {
-//                                if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
-//                                    if (mediaListMap.get(sendRtpItem.getStreamId()) == null) {
-//                                        ParentPlatform platform = storager.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
-//                                        sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
-//                                    }
-//                                }
-//                            }
-//                        }
-//                    }
-//                }
-//            }
-//        }));
     }
 
     @Override
@@ -1414,36 +1360,36 @@ public class PlayServiceImpl implements IPlayService {
     }
 
     @Override
-    public void startPushStream(SendRtpInfo sendRtpItem, DeviceChannel channel, SIPResponse sipResponse, Platform platform, CallIdHeader callIdHeader) {
+    public void startPushStream(SendRtpInfo sendRtpInfo, DeviceChannel channel, SIPResponse sipResponse, Platform platform, CallIdHeader callIdHeader) {
         // 开始发流
-        MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+        MediaServer mediaInfo = mediaServerService.getOne(sendRtpInfo.getMediaServerId());
 
         if (mediaInfo != null) {
             try {
-                if (sendRtpItem.isTcpActive()) {
-                    mediaServerService.startSendRtpPassive(mediaInfo, sendRtpItem, null);
+                if (sendRtpInfo.isTcpActive()) {
+                    mediaServerService.startSendRtpPassive(mediaInfo, sendRtpInfo, null);
                 } else {
-                    mediaServerService.startSendRtp(mediaInfo, sendRtpItem);
+                    mediaServerService.startSendRtp(mediaInfo, sendRtpInfo);
                 }
-                redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, channel, platform);
+                redisCatchStorage.sendPlatformStartPlayMsg(sendRtpInfo, channel, platform);
             }catch (ControllerException e) {
                 log.error("RTP推流失败: {}", e.getMessage());
-                startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader);
+                startSendRtpStreamFailHand(sendRtpInfo, platform, callIdHeader);
                 return;
             }
 
-            log.info("RTP推流成功[ {}/{} ],{}, ", sendRtpItem.getApp(), sendRtpItem.getStream(),
-                    sendRtpItem.isTcpActive()?"被动发流": sendRtpItem.getIp() + ":" + sendRtpItem.getPort());
+            log.info("RTP推流成功[ {}/{} ],{}, ", sendRtpInfo.getApp(), sendRtpInfo.getStream(),
+                    sendRtpInfo.isTcpActive()?"被动发流": sendRtpInfo.getIp() + ":" + sendRtpInfo.getPort());
 
         }
     }
 
     @Override
-    public void startSendRtpStreamFailHand(SendRtpInfo sendRtpItem, Platform platform, CallIdHeader callIdHeader) {
-        if (sendRtpItem.isOnlyAudio()) {
-            Device device = deviceService.getDeviceByDeviceId(sendRtpItem.getDeviceId());
-            DeviceChannel deviceChannel = deviceChannelService.getOneById(sendRtpItem.getChannelId());
-            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getChannelId());
+    public void startSendRtpStreamFailHand(SendRtpInfo sendRtpInfo, Platform platform, CallIdHeader callIdHeader) {
+        if (sendRtpInfo.isOnlyAudio()) {
+            Device device = deviceService.getDeviceByDeviceId(sendRtpInfo.getTargetId());
+            DeviceChannel deviceChannel = deviceChannelService.getOneById(sendRtpInfo.getChannelId());
+            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpInfo.getChannelId());
             if (audioBroadcastCatch != null) {
                 try {
                     cmder.streamByeCmd(device, deviceChannel.getDeviceId(), audioBroadcastCatch.getSipTransactionInfo(), null);
@@ -1455,9 +1401,9 @@ public class PlayServiceImpl implements IPlayService {
         } else {
             if (platform != null) {
                 // 向上级平台
-                CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId());
+                CommonGBChannel channel = channelService.getOne(sendRtpInfo.getChannelId());
                 try {
-                    commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel);
+                    commanderForPlatform.streamByeCmd(platform, sendRtpInfo, channel);
                 } catch (SipException | InvalidArgumentException | ParseException e) {
                     log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                 }
@@ -1475,11 +1421,11 @@ public class PlayServiceImpl implements IPlayService {
         log.info("[语音对讲] device: {}, channel: {}", device.getDeviceId(), channel.getDeviceId());
         // 查询通道使用状态
         if (audioBroadcastManager.exit(channel.getId())) {
-            SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channel.getDeviceId(), null, null);
-            if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
+            SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(channel.getId(), device.getDeviceId());
+            if (sendRtpInfo != null && sendRtpInfo.isOnlyAudio()) {
                 // 查询流是否存在,不存在则认为是异常状态
-                MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-                Boolean streamReady = mediaServerService.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
+                MediaServer mediaServer = mediaServerService.getOne(sendRtpInfo.getMediaServerId());
+                Boolean streamReady = mediaServerService.isStreamReady(mediaServer, sendRtpInfo.getApp(), sendRtpInfo.getStream());
                 if (streamReady) {
                     log.warn("[语音对讲] 正在语音广播,无法开启语音通话: {}", channel.getDeviceId());
                     event.call("正在语音广播");
@@ -1490,10 +1436,10 @@ public class PlayServiceImpl implements IPlayService {
             }
         }
 
-        SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channel.getDeviceId(), stream, null);
-        if (sendRtpItem != null) {
-            MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-            Boolean streamReady = mediaServerService.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
+        SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(channel.getId(), device.getDeviceId());
+        if (sendRtpInfo != null) {
+            MediaServer mediaServer = mediaServerService.getOne(sendRtpInfo.getMediaServerId());
+            Boolean streamReady = mediaServerService.isStreamReady(mediaServer, "rtp", sendRtpInfo.getReceiveStream());
             if (streamReady) {
                 log.warn("[语音对讲] 进行中: {}", channel.getDeviceId());
                 event.call("语音对讲进行中");
@@ -1526,13 +1472,13 @@ public class PlayServiceImpl implements IPlayService {
     @Override
     public void stopTalk(Device device, DeviceChannel channel, Boolean streamIsReady) {
         log.info("[语音对讲] 停止, {}/{}", device.getDeviceId(), channel.getDeviceId());
-        SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channel.getDeviceId(), null, null);
-        if (sendRtpItem == null) {
+        SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(channel.getId(), device.getDeviceId());
+        if (sendRtpInfo == null) {
             log.info("[语音对讲] 停止失败, 未找到发送信息,可能已经停止");
             return;
         }
         // 停止向设备推流
-        String mediaServerId = sendRtpItem.getMediaServerId();
+        String mediaServerId = sendRtpInfo.getMediaServerId();
         if (mediaServerId == null) {
             return;
         }
@@ -1540,20 +1486,20 @@ public class PlayServiceImpl implements IPlayService {
         MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
 
         if (streamIsReady == null || streamIsReady) {
-            mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
+            mediaServerService.stopSendRtp(mediaServer, sendRtpInfo.getApp(), sendRtpInfo.getStream(), sendRtpInfo.getSsrc());
         }
 
-        ssrcFactory.releaseSsrc(mediaServerId, sendRtpItem.getSsrc());
+        ssrcFactory.releaseSsrc(mediaServerId, sendRtpInfo.getSsrc());
 
-        SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(sendRtpItem.getStream());
+        SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(sendRtpInfo.getStream());
         if (ssrcTransaction != null) {
             try {
-                cmder.streamByeCmd(device, channel.getDeviceId(), sendRtpItem.getStream(), null);
+                cmder.streamByeCmd(device, channel.getDeviceId(), sendRtpInfo.getStream(), null);
             } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException  e) {
                 log.info("[语音对讲] 停止消息发送失败,可能已经停止");
             }
         }
-        redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channel.getDeviceId(),null, null);
+        sendRtpServerService.deleteByChannel(channel.getId(), device.getDeviceId());
     }
 
     @Override

+ 3 - 4
src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java

@@ -4,15 +4,14 @@ import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.bean.Platform;
 import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
+import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
 import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
+import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
-import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
 import com.genersoft.iot.vmp.service.ISendRtpServerService;
-import com.genersoft.iot.vmp.service.impl.SendRtpServerServiceImpl;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -109,7 +108,7 @@ public class SipRunner implements CommandLineRunner {
                     ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
                     boolean stopResult = mediaServerService.initStopSendRtp(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
                     if (stopResult) {
-                        Platform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
+                        Platform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getTargetId());
 
                         if (platform != null) {
                             try {

+ 6 - 7
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java

@@ -10,7 +10,6 @@ import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
@@ -118,14 +117,14 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 		// 收流端发送的停止
 		if (sendRtpItem != null){
 			CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId());
-			log.info("[收到bye] 来自{},停止通道:{}, 类型: {}, callId: {}", sendRtpItem.getPlatformId(), channel.getGbDeviceId(), sendRtpItem.getPlayType(), callIdHeader.getCallId());
+			log.info("[收到bye] 来自{},停止通道:{}, 类型: {}, callId: {}", sendRtpItem.getTargetId(), channel.getGbDeviceId(), sendRtpItem.getPlayType(), callIdHeader.getCallId());
 
 			String streamId = sendRtpItem.getStream();
 			log.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId());
 
 			if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
 				// 不是本平台的就发送redis消息让其他wvp停止发流
-				Platform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
+				Platform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getTargetId());
 				if (platform != null) {
 					redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform, channel);
 					if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
@@ -140,7 +139,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 						}
 					}
 				}else {
-					log.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId());
+					log.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getTargetId());
 				}
 			}else {
 				MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
@@ -155,7 +154,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 				AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getChannelId());
 				if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
 					// 来自上级平台的停止对讲
-					log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+					log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getTargetId(), sendRtpItem.getChannelId());
 					audioBroadcastManager.del(sendRtpItem.getChannelId());
 				}
 
@@ -164,7 +163,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 				if (mediaInfo.getReaderCount() <= 0) {
 					log.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
 					if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
-						Device device = deviceService.getDeviceByDeviceId(sendRtpItem.getDeviceId());
+						Device device = deviceService.getDeviceByDeviceId(sendRtpItem.getTargetId());
 						if (device == null) {
 							log.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
 							return;
@@ -175,7 +174,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 							return;
 						}
 						try {
-							log.info("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+							log.info("[停止点播] {}/{}", sendRtpItem.getTargetId(), sendRtpItem.getChannelId());
 							cmder.streamByeCmd(device, deviceChannel.getDeviceId(), streamId, null);
 						} catch (InvalidArgumentException | ParseException | SipException |
 								 SsrcTransactionNotFoundException e) {

+ 0 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -25,7 +25,6 @@ import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
 import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
 import gov.nist.javax.sdp.TimeDescriptionImpl;
 import gov.nist.javax.sdp.fields.TimeField;
 import gov.nist.javax.sdp.fields.URIField;
@@ -893,7 +892,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
 
                 sendRtpItem.setPlayType(InviteStreamType.BROADCAST);
                 sendRtpItem.setCallId(callIdHeader.getCallId());
-                sendRtpItem.setPlatformId(requesterId);
                 sendRtpItem.setStatus(1);
                 sendRtpItem.setApp(broadcastCatch.getApp());
                 sendRtpItem.setStream(broadcastCatch.getStream());

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java

@@ -151,7 +151,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
                                 broadcastCatch.setMediaServerItem(hookData.getMediaServer());
                                 audioBroadcastManager.update(broadcastCatch);
                                 // 推流到设备
-                                SendRtpInfo sendRtpItem = sendRtpServerService.queryByStream(null, targetId, hookData.getStream(), null);
+                                SendRtpInfo sendRtpItem = sendRtpServerService.queryByStream(hookData.getStream(), targetId);
                                 if (sendRtpItem == null) {
                                     log.warn("[国标级联] 语音喊话 异常,未找到发流信息, channelId: {}, stream: {}", targetId, hookData.getStream());
                                     log.info("[国标级联] 语音喊话 重新开始,channelId: {}, stream: {}", targetId, hookData.getStream());

+ 3 - 3
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java

@@ -114,11 +114,11 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
                 Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcTransaction.getStream(), ssrcTransaction.getMediaServerId());
                 subscribe.removeSubscribe(hook);
                 // 如果级联播放,需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题,需要将点播CallId进行上下级绑定
-                SendRtpInfo sendRtpItem =  sendRtpServerService.queryByChannelId(ssrcTransaction.getChannelId());
+                SendRtpInfo sendRtpItem =  sendRtpServerService.queryByChannelId(ssrcTransaction.getChannelId(), ssrcTransaction.getPlatformId());
                 if (sendRtpItem != null) {
-                    Platform parentPlatform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
+                    Platform parentPlatform = platformService.queryPlatformByServerGBId(sendRtpItem.getTargetId());
                     if (parentPlatform == null) {
-                        log.warn("[级联消息发送]:发送MediaStatus发现上级平台{}不存在", sendRtpItem.getPlatformId());
+                        log.warn("[级联消息发送]:发送MediaStatus发现上级平台{}不存在", sendRtpItem.getTargetId());
                         return;
                     }
                     try {

+ 16 - 8
src/main/java/com/genersoft/iot/vmp/service/ISendRtpServerService.java

@@ -7,29 +7,37 @@ import java.util.List;
 
 public interface ISendRtpServerService {
 
-    SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId,
-                                  String deviceId, Integer channelId, boolean isTcp, boolean rtcp);
+    SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, Integer port, String ssrc, String requesterId,
+                                  String deviceId, Integer channelId, Boolean isTcp, Boolean rtcp);
 
-    SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, int port, String ssrc, String platformId,
-                                  String app, String stream, Integer channelId, boolean tcp, boolean rtcp);
+    SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, Integer port, String ssrc, String platformId,
+                                  String app, String stream, Integer channelId, Boolean tcp, Boolean rtcp);
 
     void update(SendRtpInfo sendRtpItem);
 
-    SendRtpInfo queryByChannelId(Integer channelId);
+    SendRtpInfo queryByChannelId(Integer channelId, String targetId);
 
     SendRtpInfo queryByCallId(String callId);
 
-    SendRtpInfo queryByStream(String stream);
+    List<SendRtpInfo> queryByStream(String stream);
+
+    SendRtpInfo queryByStream(String stream, String targetId);
 
     void delete(SendRtpInfo sendRtpInfo);
 
     void deleteByCallId(String callId);
 
-    void deleteByStream(String Stream);
+    void deleteByStream(String Stream, String targetId);
 
-    void deleteByChannel(Integer channelId);
+    void deleteByChannel(Integer channelId, String targetId);
 
     List<SendRtpInfo> queryAll();
 
     boolean isChannelSendingRTP(Integer channelId);
+
+    List<SendRtpInfo> queryForPlatform(String platformId);
+
+    List<SendRtpInfo> queryByChannelId(int id);
+
+    void deleteByStream(String stream);
 }

+ 10 - 7
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java

@@ -15,6 +15,7 @@ import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
 import com.genersoft.iot.vmp.service.IMediaService;
+import com.genersoft.iot.vmp.service.ISendRtpServerService;
 import com.genersoft.iot.vmp.service.IUserService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
@@ -78,6 +79,9 @@ public class MediaServiceImpl implements IMediaService {
     @Autowired
     private ISIPCommander commander;
 
+    @Autowired
+    private ISendRtpServerService sendRtpServerService;
+
     @Override
     public boolean authenticatePlay(String app, String stream, String callId) {
         if (app == null || stream == null) {
@@ -234,11 +238,11 @@ public class MediaServiceImpl implements IMediaService {
                     return false;
                 }
                 // 收到无人观看说明流也没有在往上级推送
-                if (redisCatchStorage.isChannelSendingRTP(deviceChannel.getDeviceId())) {
-                    List<SendRtpInfo> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(deviceChannel.getDeviceId());
+                if (sendRtpServerService.isChannelSendingRTP(deviceChannel.getId())) {
+                    List<SendRtpInfo> sendRtpItems = sendRtpServerService.queryByChannelId(deviceChannel.getId());
                     if (!sendRtpItems.isEmpty()) {
                         for (SendRtpInfo sendRtpItem : sendRtpItems) {
-                            Platform parentPlatform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
+                            Platform parentPlatform = platformService.queryPlatformByServerGBId(sendRtpItem.getTargetId());
                             CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId());
                             if (channel == null) {
                                 continue;
@@ -248,8 +252,7 @@ public class MediaServiceImpl implements IMediaService {
                             } catch (SipException | InvalidArgumentException | ParseException e) {
                                 log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                             }
-                            redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), channel.getGbDeviceId(),
-                                    sendRtpItem.getCallId(), sendRtpItem.getStream());
+                            sendRtpServerService.delete(sendRtpItem);
                             if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
                                 redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, parentPlatform, channel);
                                 redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, parentPlatform, channel);
@@ -280,8 +283,8 @@ public class MediaServiceImpl implements IMediaService {
                 deviceChannelService.stopPlay(inviteInfo.getChannelId());
                 return result;
             }
-            SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, stream, null);
-            if (sendRtpItem != null && "talk".equals(sendRtpItem.getApp())) {
+            List<SendRtpInfo> sendRtpItemList = sendRtpServerService.queryByStream(stream);
+            if (!sendRtpItemList.isEmpty()) {
                 return false;
             }
         } else if ("talk".equals(app) || "broadcast".equals(app)) {

+ 65 - 17
src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java

@@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.PlayException;
 import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
+import com.genersoft.iot.vmp.gb28181.conf.StackLoggerImpl;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
 import com.genersoft.iot.vmp.service.ISendRtpServerService;
@@ -29,10 +30,12 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService {
 
     @Autowired
     private RedisTemplate<Object, Object> redisTemplate;
+    @Autowired
+    private StackLoggerImpl stackLoggerImpl;
 
     @Override
-    public SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId,
-                                         String deviceId, Integer channelId, boolean isTcp, boolean rtcp) {
+    public SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, Integer port, String ssrc, String requesterId,
+                                         String deviceId, Integer channelId, Boolean isTcp, Boolean rtcp) {
         int localPort = sendRtpPortManager.getNextPort(mediaServer);
         if (localPort == 0) {
             return null;
@@ -42,8 +45,8 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService {
     }
 
     @Override
-    public SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, int port, String ssrc, String platformId,
-                                         String app, String stream, Integer channelId, boolean tcp, boolean rtcp){
+    public SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, Integer port, String ssrc, String platformId,
+                                         String app, String stream, Integer channelId, Boolean tcp, Boolean rtcp){
 
         int localPort = sendRtpPortManager.getNextPort(mediaServer);
         if (localPort <= 0) {
@@ -56,13 +59,13 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService {
     @Override
     public void update(SendRtpInfo sendRtpItem) {
         redisTemplate.opsForValue().set(VideoManagerConstants.SEND_RTP_INFO_CALLID + sendRtpItem.getCallId(), sendRtpItem);
-        redisTemplate.opsForValue().set(VideoManagerConstants.SEND_RTP_INFO_STREAM + sendRtpItem.getStream(), sendRtpItem);
-        redisTemplate.opsForValue().set(VideoManagerConstants.SEND_RTP_INFO_CHANNEL + sendRtpItem.getChannelId(), sendRtpItem);
+        redisTemplate.opsForValue().set(VideoManagerConstants.SEND_RTP_INFO_STREAM + sendRtpItem.getStream() + ":" + sendRtpItem.getTargetId(), sendRtpItem);
+        redisTemplate.opsForValue().set(VideoManagerConstants.SEND_RTP_INFO_CHANNEL + sendRtpItem.getChannelId() + ":" + sendRtpItem.getTargetId(), sendRtpItem);
     }
 
     @Override
-    public SendRtpInfo queryByChannelId(Integer channelId) {
-        String key = VideoManagerConstants.SEND_RTP_INFO_CHANNEL + channelId;
+    public SendRtpInfo queryByChannelId(Integer channelId, String targetId) {
+        String key = VideoManagerConstants.SEND_RTP_INFO_CHANNEL + channelId + ":" + targetId;
         return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpInfo.class);
     }
 
@@ -74,10 +77,24 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService {
 
     @Override
     public SendRtpInfo queryByStream(String stream, String targetId) {
-        String key = VideoManagerConstants.SEND_RTP_INFO_STREAM + stream;
+        String key = VideoManagerConstants.SEND_RTP_INFO_STREAM + stream + ":" + targetId;
         return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpInfo.class);
     }
 
+    @Override
+    public List<SendRtpInfo> queryByStream(String stream) {
+        String key = VideoManagerConstants.SEND_RTP_INFO_STREAM + stream + ":*";
+        List<Object> queryResult = RedisUtil.scan(redisTemplate, key);
+        List<SendRtpInfo> result= new ArrayList<>();
+
+        for (Object o : queryResult) {
+            String keyItem = (String) o;
+            result.add((SendRtpInfo) redisTemplate.opsForValue().get(keyItem));
+        }
+
+        return result;
+    }
+
     /**
      * 删除RTP推送信息缓存
      */
@@ -87,8 +104,8 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService {
             return;
         }
         redisTemplate.delete(VideoManagerConstants.SEND_RTP_INFO_CALLID + sendRtpInfo.getCallId());
-        redisTemplate.delete(VideoManagerConstants.SEND_RTP_INFO_STREAM + sendRtpInfo.getStream());
-        redisTemplate.delete(VideoManagerConstants.SEND_RTP_INFO_CHANNEL + sendRtpInfo.getChannelId());
+        redisTemplate.delete(VideoManagerConstants.SEND_RTP_INFO_STREAM + sendRtpInfo.getStream() + ":" + sendRtpInfo.getTargetId());
+        redisTemplate.delete(VideoManagerConstants.SEND_RTP_INFO_CHANNEL + sendRtpInfo.getChannelId() + ":" + sendRtpInfo.getTargetId());
     }
     @Override
     public void deleteByCallId(String callId) {
@@ -99,22 +116,45 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService {
         delete(sendRtpInfo);
     }
     @Override
-    public void deleteByStream(String Stream) {
-        SendRtpInfo sendRtpInfo = queryByStream(Stream);
+    public void deleteByStream(String stream, String targetId) {
+        SendRtpInfo sendRtpInfo = queryByStream(stream, targetId);
         if (sendRtpInfo == null) {
             return;
         }
         delete(sendRtpInfo);
     }
+
     @Override
-    public void deleteByChannel(Integer channelId) {
-        SendRtpInfo sendRtpInfo = queryByChannelId(channelId);
+    public void deleteByStream(String stream) {
+        List<SendRtpInfo> sendRtpInfos = queryByStream(stream);
+        for (SendRtpInfo sendRtpInfo : sendRtpInfos) {
+            delete(sendRtpInfo);
+        }
+    }
+
+    @Override
+    public void deleteByChannel(Integer channelId, String targetId) {
+        SendRtpInfo sendRtpInfo = queryByChannelId(channelId, targetId);
         if (sendRtpInfo == null) {
             return;
         }
         delete(sendRtpInfo);
     }
 
+    @Override
+    public List<SendRtpInfo> queryByChannelId(int channelId) {
+        String key = VideoManagerConstants.SEND_RTP_INFO_CHANNEL + channelId + ":*";
+        List<Object> queryResult = RedisUtil.scan(redisTemplate, key);
+        List<SendRtpInfo> result= new ArrayList<>();
+
+        for (Object o : queryResult) {
+            String keyItem = (String) o;
+            result.add((SendRtpInfo) redisTemplate.opsForValue().get(keyItem));
+        }
+
+        return result;
+    }
+
     @Override
     public List<SendRtpInfo> queryAll() {
         String key = VideoManagerConstants.SEND_RTP_INFO_CALLID
@@ -135,8 +175,16 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService {
      */
     @Override
     public boolean isChannelSendingRTP(Integer channelId) {
-        SendRtpInfo sendRtpInfo = queryByChannelId(channelId);
-        return sendRtpInfo != null;
+        List<SendRtpInfo> sendRtpInfoList = queryByChannelId(channelId);
+        return !sendRtpInfoList.isEmpty();
     }
 
+    @Override
+    public List<SendRtpInfo> queryForPlatform(String platformId) {
+        List<SendRtpInfo> sendRtpInfos = queryAll();
+        if (!sendRtpInfos.isEmpty()) {
+            sendRtpInfos.removeIf(sendRtpInfo -> !sendRtpInfo.isSendToPlatform() || !sendRtpInfo.getTargetId().equals(platformId));
+        }
+        return sendRtpInfos;
+    }
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java

@@ -13,7 +13,7 @@ public interface IRedisRpcService {
 
     WVPResult stopSendRtp(Integer sendRtpChannelId);
 
-    long waitePushStreamOnline(SendRtpInfo sendRtpItem, CommonCallback<String> callback);
+    long waitePushStreamOnline(SendRtpInfo sendRtpItem, CommonCallback<Integer> callback);
 
     void stopWaitePushStreamOnline(SendRtpInfo sendRtpItem);
 

+ 13 - 9
src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java

@@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.media.event.hook.Hook;
 import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
 import com.genersoft.iot.vmp.media.event.hook.HookType;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.ISendRtpServerService;
 import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
@@ -25,6 +26,7 @@ import org.springframework.stereotype.Service;
 @Service
 public class RedisRpcServiceImpl implements IRedisRpcService {
 
+
     @Autowired
     private RedisRpcConfig redisRpcConfig;
 
@@ -40,10 +42,12 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
     @Autowired
     private RedisTemplate<Object, Object> redisTemplate;
 
-
     @Autowired
     private IMediaServerService mediaServerService;
 
+    @Autowired
+    private ISendRtpServerService sendRtpServerService;
+
     private RedisRpcRequest buildRequest(String uri, Object param) {
         RedisRpcRequest request = new RedisRpcRequest();
         request.setFromId(userSetting.getServerId());
@@ -53,7 +57,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
     }
 
     @Override
-    public SendRtpInfo getSendRtpItem(String sendRtpItemKey) {
+    public SendRtpInfo getSendRtpItem(Integer sendRtpItemKey) {
         RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItemKey);
         RedisRpcResponse response = redisRpcConfig.request(request, 10);
         if (response.getBody() == null) {
@@ -63,7 +67,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
     }
 
     @Override
-    public WVPResult startSendRtp(String sendRtpItemKey, SendRtpInfo sendRtpItem) {
+    public WVPResult startSendRtp(Integer sendRtpItemKey, SendRtpInfo sendRtpItem) {
         log.info("[请求其他WVP] 开始推流,wvp:{}, {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
         RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItemKey);
         request.setToId(sendRtpItem.getServerId());
@@ -72,7 +76,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
     }
 
     @Override
-    public WVPResult stopSendRtp(String sendRtpItemKey) {
+    public WVPResult stopSendRtp(Integer sendRtpItemKey) {
         SendRtpInfo sendRtpItem = (SendRtpInfo)redisTemplate.opsForValue().get(sendRtpItemKey);
         if (sendRtpItem == null) {
             log.info("[请求其他WVP] 停止推流, 未找到redis中的发流信息, key:{}", sendRtpItemKey);
@@ -86,7 +90,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
     }
 
     @Override
-    public long waitePushStreamOnline(SendRtpInfo sendRtpItem, CommonCallback<String> callback) {
+    public long waitePushStreamOnline(SendRtpInfo sendRtpItem, CommonCallback<Integer> callback) {
         log.info("[请求所有WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
         // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
         Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
@@ -103,9 +107,9 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
             sendRtpItem.setMediaServerId(hookData.getMediaServer().getId());
             sendRtpItem.setLocalIp(hookData.getMediaServer().getSdpIp());
             sendRtpItem.setServerId(userSetting.getServerId());
-            redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
+            sendRtpServerService.update(sendRtpItem);
             if (callback != null) {
-                callback.run(sendRtpItem.getRedisKey());
+                callback.run(sendRtpItem.getChannelId());
             }
             hookSubscribe.removeSubscribe(hook);
             redisRpcConfig.removeCallback(request.getSn());
@@ -119,7 +123,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
             log.info("[请求所有WVP监听流上线] 流上线 {}/{}->{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.toString());
 
             if (callback != null) {
-                callback.run(response.getBody().toString());
+                callback.run(Integer.parseInt(response.getBody().toString()));
             }
             hookSubscribe.removeSubscribe(hook);
         });
@@ -137,7 +141,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
     }
 
     @Override
-    public void rtpSendStopped(String sendRtpItemKey) {
+    public void rtpSendStopped(Integer sendRtpItemKey) {
         SendRtpInfo sendRtpItem = (SendRtpInfo)redisTemplate.opsForValue().get(sendRtpItemKey);
         if (sendRtpItem == null) {
             log.info("[停止WVP监听流上线] 未找到redis中的发流信息, key:{}", sendRtpItemKey);

+ 3 - 3
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -511,7 +511,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
 
         MessageForPushChannel msg = MessageForPushChannel.getInstance(0,
                 sendRtpItem.getApp(), sendRtpItem.getStream(), channel.getGbDeviceId(),
-                sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+                sendRtpItem.getTargetId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
         msg.setPlatFormIndex(platform.getId());
 
         String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
@@ -562,14 +562,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
     @Override
     public void sendStartSendRtp(SendRtpInfo sendRtpItem) {
         String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
-        log.info("[redis发送通知] 通知其他WVP推流 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
+        log.info("[redis发送通知] 通知其他WVP推流 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getTargetId());
         redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
     }
 
     @Override
     public void sendPushStreamOnline(SendRtpInfo sendRtpItem) {
         String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED;
-        log.info("[redis发送通知] 流上线 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
+        log.info("[redis发送通知] 流上线 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getTargetId());
         redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
     }
 }

+ 3 - 7
src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java

@@ -16,6 +16,7 @@ import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
+import com.genersoft.iot.vmp.service.ISendRtpServerService;
 import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
@@ -29,10 +30,8 @@ import com.github.pagehelper.PageInfo;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.event.EventListener;
-import org.springframework.jdbc.datasource.DataSourceTransactionManager;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.TransactionDefinition;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.Assert;
 import org.springframework.util.ObjectUtils;
@@ -57,10 +56,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
     private IMediaServerService mediaServerService;
 
     @Autowired
-    DataSourceTransactionManager dataSourceTransactionManager;
-
-    @Autowired
-    TransactionDefinition transactionDefinition;
+    private ISendRtpServerService sendRtpServerService;
 
     @Autowired
     private IGbChannelService gbChannelService;
@@ -290,7 +286,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
         if (userSetting.isUsePushingAsStatus()) {
             gbChannelService.offline(streamPush.buildCommonGBChannel());
         }
-        redisCatchStorage.deleteSendRTPServer(null, streamPush.getGbDeviceId(), null, streamPush.getStream());
+        sendRtpServerService.deleteByStream(streamPush.getStream());
         mediaServerService.stopSendRtp(mediaServer, streamPush.getApp(), streamPush.getStream(), null);
         streamPush.setUpdateTime(DateUtil.getNow());
         streamPushMapper.update(streamPush);