Quellcode durchsuchen

Merge branch 'develop-add-api-key' of https://github.com/ancienter/wvp-GB28181-pro into develop-add-api-key

leesam vor 1 Jahr
Ursprung
Commit
b6d70c6807
23 geänderte Dateien mit 228 neuen und 165 gelöschten Zeilen
  1. 2 8
      README.md
  2. 11 0
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java
  3. 29 0
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
  4. 1 1
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
  5. 27 13
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
  6. 36 22
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
  7. 1 0
      src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java
  8. 1 0
      src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
  9. 5 0
      src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
  10. 23 5
      src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
  11. 0 4
      src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
  12. 0 23
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
  13. 47 51
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
  14. 5 0
      src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
  15. 3 2
      src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java
  16. 13 0
      src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
  17. 0 5
      src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java
  18. 5 26
      src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
  19. 9 3
      web_src/src/components/dialog/platformEdit.vue
  20. 1 0
      数据库/2.7.0/初始化-mysql-2.7.0.sql
  21. 1 0
      数据库/2.7.0/初始化-postgresql-kingbase-2.7.0.sql
  22. 4 1
      数据库/2.7.0/更新-mysql-2.7.0.sql
  23. 4 1
      数据库/2.7.0/更新-postgresql-kingbase-2.7.0.sql

+ 2 - 8
README.md

@@ -136,12 +136,6 @@ https://gitee.com/pan648540858/wvp-GB28181-pro.git
 [ydpd](https://github.com/ydpd) [szy833](https://github.com/szy833) [ydwxb](https://github.com/ydwxb) [Albertzhu666](https://github.com/Albertzhu666)
 [mk1990](https://github.com/mk1990) [SaltFish001](https://github.com/SaltFish001)
 
+同时感谢JetBrains对开源项目的支持,本项目使用IntelliJ IDEA开发与调试:
 
-ffmpeg -re -i 123.mp3 -acodec pcm_alaw -ar 8000 -ac 1  -f rtsp rtsp://192.168.1.3:30554/broadcast/34020000001320000101_34020000001310000001
-
-ffmpeg -re -i 123.mp3 -acodec pcm_alaw -ar 8000 -ac 1  -f rtsp rtsp://192.168.1.3:30554/talk/34020000001320000011_34020000001370000001
-
-
-
-ffmpeg -re -i 123.mp3 -acodec pcm_alaw -ar 8000 -ac 1  -f rtsp rtsp://192.168.1.3:30554/talk/34020000001320000101_34020000001310000001
-
+![JetBrains](https://resources.jetbrains.com/storage/products/company/brand/logos/IntelliJ_IDEA_icon.svg?_ga=2.143694769.529214288.1712023294-439039083.1711422571&_gl=1*102dv9n*_ga*NDM5MDM5MDgzLjE3MTE0MjI1NzE.*_ga_9J976DJZ68*MTcxMjEyNjg4NC45LjEuMTcxMjEyNzc2My4zMy4wLjA.)

+ 11 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java

@@ -189,6 +189,9 @@ public class ParentPlatform {
     @Schema(description = "是否作为消息通道")
     private boolean autoPushChannel;
 
+    @Schema(description = "点播回复200OK使用次IP")
+    private String sendStreamIp;
+
     public Integer getId() {
         return id;
     }
@@ -436,4 +439,12 @@ public class ParentPlatform {
     public void setAutoPushChannel(boolean autoPushChannel) {
         this.autoPushChannel = autoPushChannel;
     }
+
+    public String getSendStreamIp() {
+        return sendStreamIp;
+    }
+
+    public void setSendStreamIp(String sendStreamIp) {
+        this.sendStreamIp = sendStreamIp;
+    }
 }

+ 29 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java

@@ -305,4 +305,33 @@ public class SendRtpItem {
     public void setReceiveStream(String receiveStream) {
         this.receiveStream = receiveStream;
     }
+
+    @Override
+    public String toString() {
+        return "SendRtpItem{" +
+                "ip='" + ip + '\'' +
+                ", port=" + port +
+                ", ssrc='" + ssrc + '\'' +
+                ", platformId='" + platformId + '\'' +
+                ", deviceId='" + deviceId + '\'' +
+                ", app='" + app + '\'' +
+                ", channelId='" + channelId + '\'' +
+                ", status=" + status +
+                ", stream='" + stream + '\'' +
+                ", tcp=" + tcp +
+                ", tcpActive=" + tcpActive +
+                ", localPort=" + localPort +
+                ", mediaServerId='" + mediaServerId + '\'' +
+                ", serverId='" + serverId + '\'' +
+                ", CallId='" + CallId + '\'' +
+                ", fromTag='" + fromTag + '\'' +
+                ", toTag='" + toTag + '\'' +
+                ", pt=" + pt +
+                ", usePs=" + usePs +
+                ", onlyAudio=" + onlyAudio +
+                ", rtcp=" + rtcp +
+                ", playType=" + playType +
+                ", receiveStream='" + receiveStream + '\'' +
+                '}';
+    }
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java

@@ -116,7 +116,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 
 		if (parentPlatform != null) {
 			Map<String, Object> param = getSendRtpParam(sendRtpItem);
-			if (mediaInfo == null) {
+			if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
 				RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
 						sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
 						sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),

+ 27 - 13
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -38,6 +38,7 @@ import gov.nist.javax.sdp.fields.TimeField;
 import gov.nist.javax.sdp.fields.URIField;
 import gov.nist.javax.sip.message.SIPRequest;
 import gov.nist.javax.sip.message.SIPResponse;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -404,12 +405,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                         //     * 2 推流中
                         sendRtpItem.setStatus(1);
                         redisCatchStorage.updateSendRTPSever(sendRtpItem);
-
+                        String sdpIp = mediaServerItemInUSe.getSdpIp();
+                        if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
+                            sdpIp = platform.getSendStreamIp();
+                        }
                         StringBuffer content = new StringBuffer(200);
                         content.append("v=0\r\n");
-                        content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n");
+                        content.append("o=" + channelId + " 0 0 IN IP4 " + sdpIp + "\r\n");
                         content.append("s=" + sessionName + "\r\n");
-                        content.append("c=IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n");
+                        content.append("c=IN IP4 " + sdpIp + "\r\n");
                         if ("Playback".equalsIgnoreCase(sessionName)) {
                             content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n");
                         } else {
@@ -575,14 +579,20 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                     }
 
                     if ("push".equals(gbStream.getStreamType())) {
-                        if (streamPushItem != null && streamPushItem.isPushIng()) {
-                            // 推流状态
-                            pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
-                        } else {
-                            // 未推流 拉起
-                            notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                        if (streamPushItem != null) {
+                            // 从redis查询是否正在接收这个推流
+                            OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
+                            if (pushListItem != null) {
+                                StreamPushItem transform = streamPushService.transform(pushListItem);
+                                transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
+                                // 推流状态
+                                pushStream(evt, request, gbStream, transform, platform, callIdHeader, mediaServerItem, port, tcpActive,
+                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                            }else {
+                                // 未推流 拉起
+                                notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
+                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                            }
                         }
                     } else if ("proxy".equals(gbStream.getStreamType())) {
                         if (null != proxyByAppAndStream) {
@@ -901,11 +911,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
 
     public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) {
 
+        String sdpIp = mediaServerItem.getSdpIp();
+        if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
+            sdpIp = platform.getSendStreamIp();
+        }
         StringBuffer content = new StringBuffer(200);
         content.append("v=0\r\n");
-        content.append("o=" + sendRtpItem.getChannelId() + " 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
+        content.append("o=" + sendRtpItem.getChannelId() + " 0 0 IN IP4 " + sdpIp + "\r\n");
         content.append("s=Play\r\n");
-        content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
+        content.append("c=IN IP4 " + sdpIp + "\r\n");
         content.append("t=0 0\r\n");
         // 非严格模式端口不统一, 增加兼容性,修改为一个不为0的端口
         int localPort = sendRtpItem.getLocalPort();

+ 36 - 22
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -509,32 +509,46 @@ public class ZLMHttpHookListener {
                     List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
                     if (!sendRtpItems.isEmpty()) {
                         for (SendRtpItem sendRtpItem : sendRtpItems) {
-                            if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) {
-                                String platformId = sendRtpItem.getPlatformId();
-                                ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
-                                Device device = deviceService.getDevice(platformId);
-
-                                try {
-                                    if (platform != null) {
-                                        commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
-                                        redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
-                                                sendRtpItem.getCallId(), sendRtpItem.getStream());
-                                    } else {
-                                        cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
-                                        if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
-                                                || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
-                                            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-                                            if (audioBroadcastCatch != null) {
-                                                // 来自上级平台的停止对讲
-                                                logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-                                                audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                            if (sendRtpItem == null) {
+                                continue;
+                            }
+
+                            if (sendRtpItem.getApp().equals(param.getApp())) {
+                                logger.info(sendRtpItem.toString());
+                                if (userSetting.getServerId().equals(sendRtpItem.getServerId())) {
+                                    MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+                                            sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+                                            sendRtpItem.getPlatformId(), null, userSetting.getServerId(), param.getMediaServerId());
+                                    // 通知其他wvp停止发流
+                                    redisCatchStorage.sendPushStreamClose(messageForPushChannel);
+                                }else {
+                                    String platformId = sendRtpItem.getPlatformId();
+                                    ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
+                                    Device device = deviceService.getDevice(platformId);
+
+                                    try {
+                                        if (platform != null) {
+                                            commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
+                                            redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
+                                                    sendRtpItem.getCallId(), sendRtpItem.getStream());
+                                        } else {
+                                            cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
+                                            if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
+                                                    || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
+                                                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                if (audioBroadcastCatch != null) {
+                                                    // 来自上级平台的停止对讲
+                                                    logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                    audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                }
                                             }
                                         }
+                                    } catch (SipException | InvalidArgumentException | ParseException |
+                                             SsrcTransactionNotFoundException e) {
+                                        logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
                                     }
-                                } catch (SipException | InvalidArgumentException | ParseException |
-                                         SsrcTransactionNotFoundException e) {
-                                    logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
                                 }
+
                             }
                         }
                     }

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java

@@ -98,4 +98,5 @@ public interface IDeviceChannelService {
 
     void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition);
 
+    void stopPlay(String deviceId, String channelId);
 }

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java

@@ -68,4 +68,5 @@ public interface IPlayService {
 
     void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback);
 
+    void stopPlay(Device device, String channelId);
 }

+ 5 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java

@@ -353,4 +353,9 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
             redisCatchStorage.sendMobilePositionMsg(jsonObject);
         }
     }
+
+    @Override
+    public void stopPlay(String deviceId, String channelId) {
+        channelMapper.stopPlay(deviceId, channelId);
+    }
 }

+ 23 - 5
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java

@@ -34,7 +34,6 @@ import com.genersoft.iot.vmp.service.bean.*;
 import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
 import com.genersoft.iot.vmp.utils.CloudRecordUtils;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
@@ -123,9 +122,6 @@ public class PlayServiceImpl implements IPlayService {
     @Autowired
     private DynamicTask dynamicTask;
 
-    @Autowired
-    private CloudRecordServiceMapper cloudRecordServiceMapper;
-
     @Autowired
     private ISIPCommanderForPlatform commanderForPlatform;
 
@@ -1170,7 +1166,7 @@ public class PlayServiceImpl implements IPlayService {
             dynamicTask.startDelay(key, ()->{
                 logger.info("[语音广播]等待invite消息超时:{}/{}", device.getDeviceId(), channelId);
                 stopAudioBroadcast(device.getDeviceId(), channelId);
-            }, 2000);
+            }, 10*1000);
         }, eventResultForError -> {
             // 发送失败
             logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg);
@@ -1592,4 +1588,26 @@ public class PlayServiceImpl implements IPlayService {
         });
     }
 
+    @Override
+    public void stopPlay(Device device, String channelId) {
+        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
+        if (inviteInfo == null) {
+            throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到");
+        }
+        if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
+            try {
+                logger.info("[停止点播] {}/{}", device.getDeviceId(), channelId);
+                cmder.streamByeCmd(device, channelId, inviteInfo.getStream(), null, null);
+            } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
+                logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
+                throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
+            }
+        }
+        inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
+        storager.stopPlay(device.getDeviceId(), channelId);
+        channelService.stopPlay(device.getDeviceId(), channelId);
+        if (inviteInfo.getStreamInfo() != null) {
+            mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream());
+        }
+    }
 }

+ 0 - 4
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java

@@ -8,7 +8,6 @@ import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
-import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
@@ -25,7 +24,6 @@ import com.genersoft.iot.vmp.service.IStreamProxyService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
 import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
 import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -333,8 +331,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
             result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
                     param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
         }
-        System.out.println("addStreamProxyToZlm====");
-        System.out.println(result);
         if (result != null && result.getInteger("code") == 0) {
             JSONObject data = result.getJSONObject("data");
             if (data == null) {

+ 0 - 23
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java

@@ -2,12 +2,10 @@ package com.genersoft.iot.vmp.service.redisMsg;
 
 import com.alibaba.fastjson2.JSON;
 import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IStreamPushService;
@@ -25,7 +23,6 @@ import org.springframework.stereotype.Component;
 import javax.sip.InvalidArgumentException;
 import javax.sip.SipException;
 import java.text.ParseException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -86,26 +83,6 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
                             logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                         }
                     }
-                    if (push.isSelf()) {
-                        // 停止向上级推流
-                        String streamId = sendRtpItem.getStream();
-                        Map<String, Object> param = new HashMap<>();
-                        param.put("vhost","__defaultVhost__");
-                        param.put("app",sendRtpItem.getApp());
-                        param.put("stream",streamId);
-                        param.put("ssrc",sendRtpItem.getSsrc());
-                        logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId);
-                        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-                        redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream());
-                        zlmServerFactory.stopSendRtpStream(mediaInfo, param);
-                        if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
-                            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
-                                    sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
-                                    sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
-                            messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
-                            redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
-                        }
-                    }
                 }
             }
         }

+ 47 - 51
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java

@@ -1,11 +1,7 @@
 package com.genersoft.iot.vmp.service.redisMsg;
 
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.conf.UserSetting;
-
 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -41,52 +37,52 @@ public class RedisStreamMsgListener implements MessageListener {
 
     @Override
     public void onMessage(Message message, byte[] bytes) {
-        boolean isEmpty = taskQueue.isEmpty();
-        taskQueue.offer(message);
-        if (isEmpty) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    Message msg = taskQueue.poll();
-                    try {
-                        JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
-                        if (steamMsgJson == null) {
-                            logger.warn("[收到redis 流变化]消息解析失败");
-                            continue;
-                        }
-                        String serverId = steamMsgJson.getString("serverId");
-
-                        if (userSetting.getServerId().equals(serverId)) {
-                            // 自己发送的消息忽略即可
-                            continue;
-                        }
-                        logger.info("[收到redis 流变化]: {}", new String(message.getBody()));
-                        String app = steamMsgJson.getString("app");
-                        String stream = steamMsgJson.getString("stream");
-                        boolean register = steamMsgJson.getBoolean("register");
-                        String mediaServerId = steamMsgJson.getString("mediaServerId");
-                        OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
-                        onStreamChangedHookParam.setSeverId(serverId);
-                        onStreamChangedHookParam.setApp(app);
-                        onStreamChangedHookParam.setStream(stream);
-                        onStreamChangedHookParam.setRegist(register);
-                        onStreamChangedHookParam.setMediaServerId(mediaServerId);
-                        onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
-                        onStreamChangedHookParam.setAliveSecond(0L);
-                        onStreamChangedHookParam.setTotalReaderCount("0");
-                        onStreamChangedHookParam.setOriginType(0);
-                        onStreamChangedHookParam.setOriginTypeStr("0");
-                        onStreamChangedHookParam.setOriginTypeStr("unknown");
-                        if (register) {
-                            zlmMediaListManager.addPush(onStreamChangedHookParam);
-                        }else {
-                            zlmMediaListManager.removeMedia(app, stream);
-                        }
-                    }catch (Exception e) {
-                        logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
-                        logger.error("[REDIS消息-流变化] 异常内容: ", e);
-                    }
-                }
-            });
-        }
+//        boolean isEmpty = taskQueue.isEmpty();
+//        taskQueue.offer(message);
+//        if (isEmpty) {
+//            taskExecutor.execute(() -> {
+//                while (!taskQueue.isEmpty()) {
+//                    Message msg = taskQueue.poll();
+//                    try {
+//                        JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
+//                        if (steamMsgJson == null) {
+//                            logger.warn("[收到redis 流变化]消息解析失败");
+//                            continue;
+//                        }
+//                        String serverId = steamMsgJson.getString("serverId");
+//
+//                        if (userSetting.getServerId().equals(serverId)) {
+//                            // 自己发送的消息忽略即可
+//                            continue;
+//                        }
+//                        logger.info("[收到redis 流变化]: {}", new String(message.getBody()));
+//                        String app = steamMsgJson.getString("app");
+//                        String stream = steamMsgJson.getString("stream");
+//                        boolean register = steamMsgJson.getBoolean("register");
+//                        String mediaServerId = steamMsgJson.getString("mediaServerId");
+//                        OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
+//                        onStreamChangedHookParam.setSeverId(serverId);
+//                        onStreamChangedHookParam.setApp(app);
+//                        onStreamChangedHookParam.setStream(stream);
+//                        onStreamChangedHookParam.setRegist(register);
+//                        onStreamChangedHookParam.setMediaServerId(mediaServerId);
+//                        onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
+//                        onStreamChangedHookParam.setAliveSecond(0L);
+//                        onStreamChangedHookParam.setTotalReaderCount("0");
+//                        onStreamChangedHookParam.setOriginType(0);
+//                        onStreamChangedHookParam.setOriginTypeStr("0");
+//                        onStreamChangedHookParam.setOriginTypeStr("unknown");
+//                        if (register) {
+//                            zlmMediaListManager.addPush(onStreamChangedHookParam);
+//                        }else {
+//                            zlmMediaListManager.removeMedia(app, stream);
+//                        }
+//                    }catch (Exception e) {
+//                        logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
+//                        logger.error("[REDIS消息-流变化] 异常内容: ", e);
+//                    }
+//                }
+//            });
+//        }
     }
 }

+ 5 - 0
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java

@@ -211,5 +211,10 @@ public interface IRedisCatchStorage {
 
     void addPushListItem(String app, String stream, OnStreamChangedHookParam param);
 
+    OnStreamChangedHookParam getPushListItem(String app, String stream);
+
     void removePushListItem(String app, String stream, String mediaServerId);
+
+    void sendPushStreamClose(MessageForPushChannel messageForPushChannel);
+
 }

+ 3 - 2
src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java

@@ -17,10 +17,10 @@ public interface ParentPlatformMapper {
 
     @Insert("INSERT INTO wvp_platform (enable, name, server_gb_id, server_gb_domain, server_ip, server_port,device_gb_id,device_ip,"+
             "device_port,username,password,expires,keep_timeout,transport,character_set,ptz,rtcp,as_message_channel,auto_push_channel,"+
-            "status,start_offline_push,catalog_id,administrative_division,catalog_group,create_time,update_time) " +
+            "status,start_offline_push,catalog_id,administrative_division,catalog_group,create_time,update_time,send_stream_ip) " +
             "            VALUES (#{enable}, #{name}, #{serverGBId}, #{serverGBDomain}, #{serverIP}, #{serverPort}, #{deviceGBId}, #{deviceIp}, " +
             "            #{devicePort}, #{username}, #{password}, #{expires}, #{keepTimeout}, #{transport}, #{characterSet}, #{ptz}, #{rtcp}, #{asMessageChannel}, #{autoPushChannel}, " +
-            "            #{status},  #{startOfflinePush}, #{catalogId}, #{administrativeDivision}, #{catalogGroup}, #{createTime}, #{updateTime})")
+            "            #{status},  #{startOfflinePush}, #{catalogId}, #{administrativeDivision}, #{catalogGroup}, #{createTime}, #{updateTime}, #{sendStreamIp})")
     int addParentPlatform(ParentPlatform parentPlatform);
 
     @Update("UPDATE wvp_platform " +
@@ -49,6 +49,7 @@ public interface ParentPlatformMapper {
             "administrative_division=#{administrativeDivision}, " +
             "create_time=#{createTime}, " +
             "update_time=#{updateTime}, " +
+            "send_stream_ip=#{sendStreamIp}, " +
             "catalog_id=#{catalogId} " +
             "WHERE id=#{id}")
     int updateParentPlatform(ParentPlatform parentPlatform);

+ 13 - 0
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -656,6 +656,12 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
         redisTemplate.opsForValue().set(key, param);
     }
 
+    @Override
+    public OnStreamChangedHookParam getPushListItem(String app, String stream) {
+        String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream;
+        return (OnStreamChangedHookParam)redisTemplate.opsForValue().get(key);
+    }
+
     @Override
     public void removePushListItem(String app, String stream, String mediaServerId) {
         String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream;
@@ -665,4 +671,11 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
         }
 
     }
+
+    @Override
+    public void sendPushStreamClose(MessageForPushChannel msg) {
+        String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED;
+        logger.info("[redis发送通知] 发送 停止向上级推流 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
+        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
+    }
 }

+ 0 - 5
src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java

@@ -1,12 +1,8 @@
 package com.genersoft.iot.vmp.vmanager.cloudRecord;
 
 import com.alibaba.fastjson2.JSONArray;
-import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.conf.security.JwtUtils;
-import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.ICloudRecordService;
 import com.genersoft.iot.vmp.service.IMediaServerService;
@@ -22,7 +18,6 @@ import org.apache.commons.lang3.ObjectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.web.bind.annotation.*;
 
 import javax.servlet.http.HttpServletRequest;

+ 5 - 26
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java

@@ -3,12 +3,10 @@ package com.genersoft.iot.vmp.vmanager.gb28181.play;
 import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.common.InviteInfo;
-import com.genersoft.iot.vmp.common.InviteSessionStatus;
 import com.genersoft.iot.vmp.common.InviteSessionType;
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
-import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
 import com.genersoft.iot.vmp.conf.security.JwtUtils;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
@@ -26,7 +24,7 @@ import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
-import com.genersoft.iot.vmp.vmanager.bean.*;
+import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
 import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
@@ -41,11 +39,8 @@ import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 
 import javax.servlet.http.HttpServletRequest;
-import javax.sip.InvalidArgumentException;
-import javax.sip.SipException;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.text.ParseException;
 import java.util.List;
 import java.util.UUID;
 
@@ -157,7 +152,8 @@ public class PlayController {
 				wvpResult.setMsg(msg);
 			}
 			requestMessage.setData(wvpResult);
-			resultHolder.invokeResult(requestMessage);
+			// 此处必须释放所有请求
+			resultHolder.invokeAllResult(requestMessage);
 		});
 		return result;
 	}
@@ -165,9 +161,8 @@ public class PlayController {
 	@Operation(summary = "停止点播", security = @SecurityRequirement(name = JwtUtils.HEADER))
 	@Parameter(name = "deviceId", description = "设备国标编号", required = true)
 	@Parameter(name = "channelId", description = "通道国标编号", required = true)
-	@Parameter(name = "isSubStream", description = "是否子码流(true-子码流,false-主码流),默认为false", required = true)
 	@GetMapping("/stop/{deviceId}/{channelId}")
-	public JSONObject playStop(@PathVariable String deviceId, @PathVariable String channelId,boolean isSubStream) {
+	public JSONObject playStop(@PathVariable String deviceId, @PathVariable String channelId) {
 
 		logger.debug(String.format("设备预览/回放停止API调用,streamId:%s_%s", deviceId, channelId ));
 
@@ -180,26 +175,10 @@ public class PlayController {
 			throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备[" + deviceId + "]不存在");
 		}
 
-		InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
-		if (inviteInfo == null) {
-			throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到");
-		}
-		if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
-			try {
-				logger.info("[停止点播] {}/{}", device.getDeviceId(), channelId);
-				cmder.streamByeCmd(device, channelId, inviteInfo.getStream(), null, null);
-			} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
-				logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
-				throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
-			}
-		}
-		inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
-		storager.stopPlay(deviceId, channelId);
-
+		playService.stopPlay(device, channelId);
 		JSONObject json = new JSONObject();
 		json.put("deviceId", deviceId);
 		json.put("channelId", channelId);
-		json.put("isSubStream", isSubStream);
 		return json;
 	}
 

+ 9 - 3
web_src/src/components/dialog/platformEdit.vue

@@ -37,8 +37,8 @@
               <el-form-item label="本地端口" prop="devicePort">
                 <el-input v-model="platform.devicePort" :disabled="true" type="number"></el-input>
               </el-form-item>
-              <el-form-item label="SIP认证用户名" prop="username">
-                <el-input v-model="platform.username"></el-input>
+              <el-form-item label="SDP发流IP" prop="sendStreamIp">
+                <el-input v-model="platform.sendStreamIp"></el-input>
               </el-form-item>
             </el-form>
           </el-col>
@@ -47,6 +47,9 @@
               <el-form-item label="行政区划" prop="administrativeDivision">
                 <el-input v-model="platform.administrativeDivision" clearable></el-input>
               </el-form-item>
+              <el-form-item label="SIP认证用户名" prop="username">
+                <el-input v-model="platform.username"></el-input>
+              </el-form-item>
               <el-form-item label="SIP认证密码" prop="password">
                 <el-input v-model="platform.password" ></el-input>
               </el-form-item>
@@ -159,7 +162,8 @@ export default {
         characterSet: "GB2312",
         startOfflinePush: false,
         catalogGroup: 1,
-        administrativeDivision: null,
+        administrativeDivision: "",
+        sendStreamIp: null,
       },
       rules: {
         name: [{ required: true, message: "请输入平台名称", trigger: "blur" }],
@@ -198,6 +202,7 @@ export default {
             that.platform.devicePort = res.data.data.devicePort;
             that.platform.username = res.data.data.username;
             that.platform.password = res.data.data.password;
+            that.platform.sendStreamIp = res.data.data.sendStreamIp;
             that.platform.administrativeDivision = res.data.data.username.substr(0, 6);
           }
 
@@ -228,6 +233,7 @@ export default {
         this.platform.catalogId = platform.catalogId;
         this.platform.startOfflinePush = platform.startOfflinePush;
         this.platform.catalogGroup = platform.catalogGroup;
+        this.platform.sendStreamIp = platform.sendStreamIp;
         this.platform.administrativeDivision = platform.administrativeDivision;
         this.onSubmit_text = "保存";
         this.saveUrl = "/api/platform/save";

+ 1 - 0
数据库/2.7.0/初始化-mysql-2.7.0.sql

@@ -198,6 +198,7 @@ create table wvp_platform (
                               update_time character varying(50),
                               as_message_channel bool default false,
                               auto_push_channel bool default false,
+                              send_stream_ip character varying(50),
                               constraint uk_platform_unique_server_gb_id unique (server_gb_id)
 );
 

+ 1 - 0
数据库/2.7.0/初始化-postgresql-kingbase-2.7.0.sql

@@ -198,6 +198,7 @@ create table wvp_platform (
                               update_time character varying(50),
                               as_message_channel bool default false,
                               auto_push_channel bool default false,
+                              send_stream_ip character varying(50),
                               constraint uk_platform_unique_server_gb_id unique (server_gb_id)
 );
 

+ 4 - 1
数据库/2.7.0/更新-mysql-2.7.0.sql

@@ -2,4 +2,7 @@ alter table wvp_device_channel
     add stream_identification character varying(50);
 
 alter table wvp_device
-    drop switch_primary_sub_stream;
+    drop switch_primary_sub_stream;
+
+alter table wvp_platform
+    add send_stream_ip character varying(50);

+ 4 - 1
数据库/2.7.0/更新-postgresql-kingbase-2.7.0.sql

@@ -2,4 +2,7 @@ alter table wvp_device_channel
     add stream_identification character varying(50);
 
 alter table wvp_device
-    drop switch_primary_sub_stream;
+    drop switch_primary_sub_stream;
+
+alter table wvp_platform
+    add send_stream_ip character varying(50);