浏览代码

优化代码调用

648540858 1 年之前
父节点
当前提交
6c0087db88
共有 19 个文件被更改,包括 205 次插入224 次删除
  1. 19 0
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
  2. 0 2
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
  3. 2 6
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
  4. 5 24
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
  5. 9 23
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
  6. 16 38
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
  7. 8 11
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java
  8. 6 1
      src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java
  9. 55 2
      src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
  10. 4 16
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
  11. 12 0
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java
  12. 0 4
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
  13. 18 0
      src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java
  14. 9 16
      src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
  15. 3 8
      src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
  16. 2 6
      src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
  17. 17 39
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
  18. 2 13
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
  19. 18 15
      src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java

+ 19 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java

@@ -1,5 +1,7 @@
 package com.genersoft.iot.vmp.gb28181.bean;
 
+import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
+
 public class SendRtpItem {
 
     /**
@@ -122,6 +124,23 @@ public class SendRtpItem {
      */
     private String receiveStream;
 
+    public static SendRtpItem getInstance(RequestPushStreamMsg requestPushStreamMsg) {
+        SendRtpItem sendRtpItem = new SendRtpItem();
+        sendRtpItem.setMediaServerId(requestPushStreamMsg.getMediaServerId());
+        sendRtpItem.setApp(requestPushStreamMsg.getApp());
+        sendRtpItem.setStream(requestPushStreamMsg.getStream());
+        sendRtpItem.setIp(requestPushStreamMsg.getIp());
+        sendRtpItem.setPort(requestPushStreamMsg.getPort());
+        sendRtpItem.setSsrc(requestPushStreamMsg.getSsrc());
+        sendRtpItem.setTcp(requestPushStreamMsg.isTcp());
+        sendRtpItem.setLocalPort(requestPushStreamMsg.getSrcPort());
+        sendRtpItem.setPt(requestPushStreamMsg.getPt());
+        sendRtpItem.setUsePs(requestPushStreamMsg.isPs());
+        sendRtpItem.setOnlyAudio(requestPushStreamMsg.isOnlyAudio());
+        return sendRtpItem;
+        
+    }
+
     public String getIp() {
         return ip;
     }

+ 0 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

@@ -75,8 +75,6 @@ public class SIPCommander implements ISIPCommander {
     @Autowired
     private IMediaServerService mediaServerService;
 
-    @Autowired
-    private ZLMServerFactory zlmServerFactory;
 
 
     /**

+ 2 - 6
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java

@@ -13,11 +13,10 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
+import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.event.hook.Hook;
 import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
 import com.genersoft.iot.vmp.media.event.hook.HookType;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -65,9 +64,6 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
     @Autowired
     private SipSubscribe sipSubscribe;
 
-    @Autowired
-    private ZLMServerFactory zlmServerFactory;
-
     @Autowired
     private SipLayer sipLayer;
 
@@ -846,7 +842,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
         MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
         if (mediaServerItem != null) {
             mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
-            zlmServerFactory.closeRtpServer(mediaServerItem, sendRtpItem.getStream());
+            mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream());
         }
         SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem);
         if (byeRequest == null) {

+ 5 - 24
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java

@@ -1,22 +1,17 @@
 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 
-import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
-import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
-import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
-import com.genersoft.iot.vmp.service.IDeviceService;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IDeviceService;
 import com.genersoft.iot.vmp.service.IPlayService;
 import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
 import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
@@ -28,17 +23,12 @@ import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import javax.sip.InvalidArgumentException;
 import javax.sip.RequestEvent;
-import javax.sip.SipException;
 import javax.sip.address.SipURI;
 import javax.sip.header.CallIdHeader;
 import javax.sip.header.FromHeader;
 import javax.sip.header.HeaderAddress;
 import javax.sip.header.ToHeader;
-import java.text.ParseException;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * SIP命令类型: ACK请求
@@ -71,12 +61,6 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 	@Autowired
 	private IDeviceService deviceService;
 
-	@Autowired
-	private ZLMServerFactory zlmServerFactory;
-
-	@Autowired
-	private HookSubscribe hookSubscribe;
-
 	@Autowired
 	private IMediaServerService mediaServerService;
 
@@ -122,11 +106,8 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 
 		if (parentPlatform != null) {
 			if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
-				RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
-						sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
-						sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
-						sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
-				redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
+				RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
+				redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
 					playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader);
 				});
 			} else {
@@ -134,7 +115,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 					if (sendRtpItem.isTcpActive()) {
 						mediaServerService.startSendRtpPassive(mediaInfo, parentPlatform, sendRtpItem, null);
 					} else {
-						mediaServerService.startSendRtpStream(mediaInfo, parentPlatform, sendRtpItem);
+						mediaServerService.startSendRtp(mediaInfo, parentPlatform, sendRtpItem);
 					}
 				}catch (ControllerException e) {
 					logger.error("RTP推流失败: {}", e.getMessage());
@@ -159,7 +140,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 				if (sendRtpItem.isTcpActive()) {
 					mediaServerService.startSendRtpPassive(mediaInfo, null, sendRtpItem, null);
 				} else {
-					mediaServerService.startSendRtpStream(mediaInfo, null, sendRtpItem);
+					mediaServerService.startSendRtp(mediaInfo, null, sendRtpItem);
 				}
 			}catch (ControllerException e) {
 				logger.error("RTP推流失败: {}", e.getMessage());

+ 9 - 23
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java

@@ -6,16 +6,15 @@ import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
-import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
-import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
+import com.genersoft.iot.vmp.media.bean.MediaInfo;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
+import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@@ -36,8 +35,6 @@ import javax.sip.SipException;
 import javax.sip.header.CallIdHeader;
 import javax.sip.message.Response;
 import java.text.ParseException;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * SIP命令类型: BYE请求
@@ -75,12 +72,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 	@Autowired
 	private IVideoManagerStorage storager;
 
-	@Autowired
-	private ZLMServerFactory zlmServerFactory;
-
-	@Autowired
-	private SSRCFactory ssrcFactory;
-
 	@Autowired
 	private IMediaServerService mediaServerService;
 
@@ -110,7 +101,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 
 	/**
 	 * 处理BYE请求
-	 * @param evt
 	 */
 	@Override
 	public void process(RequestEvent evt) {
@@ -128,11 +118,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 			logger.info("[收到bye] 来自{},停止通道:{}, 类型: {}, callId: {}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getPlayType(), callIdHeader.getCallId());
 
 			String streamId = sendRtpItem.getStream();
-			Map<String, Object> param = new HashMap<>();
-			param.put("vhost","__defaultVhost__");
-			param.put("app",sendRtpItem.getApp());
-			param.put("stream",streamId);
-			param.put("ssrc",sendRtpItem.getSsrc());
 			logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId());
 
 			if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
@@ -149,7 +134,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 					MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
 					redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
 							callIdHeader.getCallId(), null);
-					zlmServerFactory.stopSendRtpStream(mediaInfo, param);
+					mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
 					if (userSetting.getUseCustomSsrcForParentInvite()) {
 						mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
 					}
@@ -169,13 +154,13 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 				MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
 				redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
 						callIdHeader.getCallId(), null);
-				zlmServerFactory.stopSendRtpStream(mediaInfo, param);
+				mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
 				if (userSetting.getUseCustomSsrcForParentInvite()) {
 					mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
 				}
 			}
-			MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-			if (mediaInfo != null) {
+			MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+			if (mediaServer != null) {
 				AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
 				if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
 					// 来自上级平台的停止对讲
@@ -183,8 +168,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 					audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
 				}
 
-				int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
-				if (totalReaderCount <= 0) {
+				MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), streamId);
+
+				if (mediaInfo.getReaderCount() <= 0) {
 					logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
 					if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
 						Device device = deviceService.getDevice(sendRtpItem.getDeviceId());

+ 16 - 38
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
@@ -24,7 +25,6 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
 import com.genersoft.iot.vmp.media.event.hook.HookType;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.IInviteStreamService;
@@ -61,7 +61,6 @@ import javax.sip.header.CallIdHeader;
 import javax.sip.message.Response;
 import java.text.ParseException;
 import java.time.Instant;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 import java.util.Vector;
@@ -113,9 +112,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
     @Autowired
     private AudioBroadcastManager audioBroadcastManager;
 
-    @Autowired
-    private ZLMServerFactory zlmServerFactory;
-
     @Autowired
     private IMediaServerService mediaServerService;
 
@@ -382,8 +378,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                     } else {
                         streamTypeStr = "UDP";
                     }
-                    logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
-                    SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
+                    logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}",
+                            sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
+                    SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                             device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
 
                     if (tcpActive != null) {
@@ -462,30 +459,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                             responseSdpAck(request, content.toString(), platform);
                             // tcp主动模式,回复sdp后开启监听
                             if (sendRtpItem.isTcpActive()) {
-                                MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-                                Map<String, Object> param = new HashMap<>(12);
-                                param.put("vhost","__defaultVhost__");
-                                param.put("app",sendRtpItem.getApp());
-                                param.put("stream",sendRtpItem.getStream());
-                                param.put("ssrc", sendRtpItem.getSsrc());
-                                if (!sendRtpItem.isTcpActive()) {
-                                    param.put("dst_url",sendRtpItem.getIp());
-                                    param.put("dst_port", sendRtpItem.getPort());
-                                }
-                                String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
-                                param.put("is_udp", is_Udp);
-                                param.put("src_port", localPort);
-                                param.put("pt", sendRtpItem.getPt());
-                                param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
-                                param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
-                                if (!sendRtpItem.isTcp()) {
-                                    // 开启rtcp保活
-                                    param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
-                                }
-                                JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
-                                if (startSendRtpStreamResult != null) {
-                                    startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader);
-                                }
+                                MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+                                try {
+                                    mediaServerService.startSendRtpPassive(mediaServer, platform, sendRtpItem, 5);
+                                }catch (ControllerException e) {}
                             }
                         } catch (SipException | InvalidArgumentException | ParseException e) {
                             logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
@@ -638,13 +615,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
      * 安排推流
      */
     private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform,
-                            CallIdHeader callIdHeader, MediaServer mediaServerItem,
+                            CallIdHeader callIdHeader, MediaServer mediaServer,
                             int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                             String channelId, String addressStr, String ssrc, String requesterId) {
-            Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
+            Boolean streamReady = mediaServerService.isStreamReady(mediaServer, gbStream.getApp(), gbStream.getStream());
             if (streamReady != null && streamReady) {
+
                 // 自平台内容
-                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
+                SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServer, addressStr, port, ssrc, requesterId,
                         gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
 
             if (sendRtpItem == null) {
@@ -665,7 +643,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
             sendRtpItem.setCallId(callIdHeader.getCallId());
             sendRtpItem.setFromTag(request.getFromTag());
 
-            SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
+            SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt);
             if (response != null) {
                 sendRtpItem.setToTag(response.getToTag());
             }
@@ -684,7 +662,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
             Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
             if (streamReady != null && streamReady) {
                 // 自平台内容
-                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
+                SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                         gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
 
                 if (sendRtpItem == null) {
@@ -794,7 +772,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                 dynamicTask.stop(callIdHeader.getCallId());
                 redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
                 if (serverId.equals(userSetting.getServerId())) {
-                    SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
+                    SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                             app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
 
                     if (sendRtpItem == null) {
@@ -1074,7 +1052,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                         mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP", sdp.getSessionName().getValue());
                 CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
 
-                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId,
+                SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId,
                         device.getDeviceId(), broadcastCatch.getChannelId(),
                         mediaTransmissionTCP, false);
 

+ 8 - 11
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java

@@ -1,16 +1,15 @@
 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd;
 
-import com.alibaba.fastjson2.JSONObject;
+import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
-import com.genersoft.iot.vmp.service.IDeviceService;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IDeviceService;
 import com.genersoft.iot.vmp.service.IPlatformService;
 import com.genersoft.iot.vmp.service.IPlayService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -62,9 +61,6 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
     @Autowired
     private AudioBroadcastManager audioBroadcastManager;
 
-    @Autowired
-    private ZLMServerFactory zlmServerFactory;
-
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
 
@@ -155,12 +151,13 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
                                     }
                                 }else {
                                     // 发流
-                                    JSONObject jsonObject = zlmServerFactory.startSendRtp(hookData.getMediaServer(), sendRtpItem);
-                                    if (jsonObject != null && jsonObject.getInteger("code") == 0 ) {
-                                        logger.info("[语音喊话] 自动推流成功, device: {}, channel: {}", device.getDeviceId(), targetId);
-                                    }else {
-                                        logger.info("[语音喊话] 推流失败, 结果: {}", jsonObject);
+                                    try {
+                                        mediaServerService.startSendRtp(hookData.getMediaServer(),null, sendRtpItem);
+                                    }catch (ControllerException e) {
+                                        logger.info("[语音喊话] 推流失败, 结果: {}", e.getMessage());
+                                        return;
                                     }
+                                    logger.info("[语音喊话] 自动推流成功, device: {}, channel: {}", device.getDeviceId(), targetId);
                                 }
                             }
                         }else {

+ 6 - 1
src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java

@@ -141,5 +141,10 @@ public interface IMediaServerService {
 
     void startSendRtpPassive(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem, Integer timeout);
 
-    void startSendRtpStream(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem);
+    void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem);
+
+    SendRtpItem createSendRtpItem(MediaServer mediaServerItem, String addressStr, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean mediaTransmissionTCP, boolean rtcp);
+
+    SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
+                                  String app, String stream, String channelId, boolean tcp, boolean rtcp);
 }

+ 55 - 2
src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java

@@ -19,6 +19,7 @@ import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent;
 import com.genersoft.iot.vmp.media.service.IMediaNodeServerService;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
+import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
 import com.genersoft.iot.vmp.service.IInviteStreamService;
 import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
@@ -83,6 +84,9 @@ public class MediaServerServiceImpl implements IMediaServerService {
     @Autowired
     private MediaConfig mediaConfig;
 
+    @Autowired
+    private SendRtpPortManager sendRtpPortManager;
+
 
 
     /**
@@ -812,7 +816,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
     }
 
     @Override
-    public void startSendRtpStream(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem) {
+    public void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem) {
         IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
         if (mediaNodeServerService == null) {
             logger.info("[startSendRtpStream] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
@@ -821,7 +825,10 @@ public class MediaServerServiceImpl implements IMediaServerService {
         logger.info("[开始推流] rtp/{}, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(),
                 sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
         mediaNodeServerService.startSendRtpStream(mediaServer, sendRtpItem);
-        sendPlatformStartPlayMsg(platform, sendRtpItem);
+        if (platform != null) {
+            sendPlatformStartPlayMsg(platform, sendRtpItem);
+        }
+
 
     }
 
@@ -834,4 +841,50 @@ public class MediaServerServiceImpl implements IMediaServerService {
             redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
         }
     }
+
+    @Override
+    public SendRtpItem createSendRtpItem(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean isTcp, boolean rtcp) {
+        int localPort = sendRtpPortManager.getNextPort(mediaServer);
+        if (localPort == 0) {
+            return null;
+        }
+        SendRtpItem sendRtpItem = new SendRtpItem();
+        sendRtpItem.setIp(ip);
+        sendRtpItem.setPort(port);
+        sendRtpItem.setSsrc(ssrc);
+        sendRtpItem.setPlatformId(deviceId);
+        sendRtpItem.setDeviceId(deviceId);
+        sendRtpItem.setChannelId(channelId);
+        sendRtpItem.setTcp(isTcp);
+        sendRtpItem.setRtcp(rtcp);
+        sendRtpItem.setApp("rtp");
+        sendRtpItem.setLocalPort(localPort);
+        sendRtpItem.setServerId(userSetting.getServerId());
+        sendRtpItem.setMediaServerId(mediaServer.getId());
+        return sendRtpItem;
+    }
+
+    @Override
+    public SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
+                                         String app, String stream, String channelId, boolean tcp, boolean rtcp){
+
+        int localPort = sendRtpPortManager.getNextPort(serverItem);
+        if (localPort == 0) {
+            return null;
+        }
+        SendRtpItem sendRtpItem = new SendRtpItem();
+        sendRtpItem.setIp(ip);
+        sendRtpItem.setPort(port);
+        sendRtpItem.setSsrc(ssrc);
+        sendRtpItem.setApp(app);
+        sendRtpItem.setStream(stream);
+        sendRtpItem.setPlatformId(platformId);
+        sendRtpItem.setChannelId(channelId);
+        sendRtpItem.setTcp(tcp);
+        sendRtpItem.setLocalPort(localPort);
+        sendRtpItem.setServerId(userSetting.getServerId());
+        sendRtpItem.setMediaServerId(serverItem.getId());
+        sendRtpItem.setRtcp(rtcp);
+        return sendRtpItem;
+    }
 }

+ 4 - 16
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java

@@ -3,15 +3,13 @@ package com.genersoft.iot.vmp.media.zlm;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
-import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.dto.*;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IStreamProxyService;
+import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
 import com.genersoft.iot.vmp.service.IStreamPushService;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
 import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import org.slf4j.Logger;
@@ -20,7 +18,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.text.ParseException;
-import java.util.*;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -37,26 +35,16 @@ public class ZLMMediaListManager {
     @Autowired
     private GbStreamMapper gbStreamMapper;
 
-    @Autowired
-    private PlatformGbStreamMapper platformGbStreamMapper;
-
     @Autowired
     private IStreamPushService streamPushService;
 
-    @Autowired
-    private IStreamProxyService streamProxyService;
 
     @Autowired
     private StreamPushMapper streamPushMapper;
 
-    @Autowired
-    private HookSubscribe subscribe;
-
     @Autowired
     private UserSetting userSetting;
 
-    @Autowired
-    private ZLMServerFactory zlmServerFactory;
 
     @Autowired
     private IMediaServerService mediaServerService;

+ 12 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java

@@ -316,11 +316,23 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
         if (timeout  != null) {
             param.put("close_delay_ms", timeout);
         }
+        if (!sendRtpItem.isTcp()) {
+            // 开启rtcp保活
+            param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
+        }
+        if (!sendRtpItem.isTcpActive()) {
+            param.put("dst_url",sendRtpItem.getIp());
+            param.put("dst_port", sendRtpItem.getPort());
+        }
 
         JSONObject jsonObject = zlmServerFactory.startSendRtpPassive(mediaServer, param, null);
         if (jsonObject == null || jsonObject.getInteger("code") != 0 ) {
+            logger.error("启动监听TCP被动推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param));
             throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg"));
         }
+        logger.info("调用ZLM-TCP被动推流接口, 结果: {}",  jsonObject);
+        logger.info("启动监听TCP被动推流成功[ {}/{} ],{}->{}:{}, " , sendRtpItem.getApp(), sendRtpItem.getStream(),
+                jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
     }
 
     @Override

+ 0 - 4
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java

@@ -5,7 +5,6 @@ import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.common.CommonCallback;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,9 +25,6 @@ public class ZLMServerFactory {
     @Autowired
     private UserSetting userSetting;
 
-    @Autowired
-    private HookSubscribe hookSubscribe;
-
     @Autowired
     private SendRtpPortManager sendRtpPortManager;
 

+ 18 - 0
src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java

@@ -1,5 +1,7 @@
 package com.genersoft.iot.vmp.service.bean;
 
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+
 /**
  * redis消息:请求下级推送流信息
  * @author lin
@@ -80,6 +82,22 @@ public class RequestPushStreamMsg {
         return requestPushStreamMsg;
     }
 
+    public static RequestPushStreamMsg getInstance(SendRtpItem sendRtpItem) {
+        RequestPushStreamMsg requestPushStreamMsg = new RequestPushStreamMsg();
+        requestPushStreamMsg.setMediaServerId(sendRtpItem.getMediaServerId());
+        requestPushStreamMsg.setApp(sendRtpItem.getApp());
+        requestPushStreamMsg.setStream(sendRtpItem.getStream());
+        requestPushStreamMsg.setIp(sendRtpItem.getIp());
+        requestPushStreamMsg.setPort(sendRtpItem.getPort());
+        requestPushStreamMsg.setSsrc(sendRtpItem.getSsrc());
+        requestPushStreamMsg.setTcp(sendRtpItem.isTcp());
+        requestPushStreamMsg.setSrcPort(sendRtpItem.getLocalPort());
+        requestPushStreamMsg.setPt(sendRtpItem.getPt());
+        requestPushStreamMsg.setPs(sendRtpItem.isUsePs());
+        requestPushStreamMsg.setOnlyAudio(sendRtpItem.isOnlyAudio());
+        return requestPushStreamMsg;
+    }
+
     public String getMediaServerId() {
         return mediaServerId;
     }

+ 9 - 16
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java

@@ -1,7 +1,9 @@
 package com.genersoft.iot.vmp.service.impl;
 
 import com.baomidou.dynamic.datasource.annotation.DS;
-import com.genersoft.iot.vmp.common.*;
+import com.genersoft.iot.vmp.common.InviteInfo;
+import com.genersoft.iot.vmp.common.InviteSessionStatus;
+import com.genersoft.iot.vmp.common.InviteSessionType;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
@@ -11,13 +13,12 @@ import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
+import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.event.hook.HookData;
+import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
 import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
 import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
-import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.service.IInviteStreamService;
 import com.genersoft.iot.vmp.service.IPlatformService;
 import com.genersoft.iot.vmp.service.IPlayService;
@@ -41,7 +42,9 @@ import javax.sip.InvalidArgumentException;
 import javax.sip.ResponseEvent;
 import javax.sip.SipException;
 import java.text.ParseException;
-import java.util.*;
+import java.util.List;
+import java.util.UUID;
+import java.util.Vector;
 
 /**
  * @author lin
@@ -75,9 +78,6 @@ public class PlatformServiceImpl implements IPlatformService {
     @Autowired
     private DynamicTask dynamicTask;
 
-    @Autowired
-    private ZLMServerFactory zlmServerFactory;
-
     @Autowired
     private SubscribeHolder subscribeHolder;
 
@@ -87,9 +87,6 @@ public class PlatformServiceImpl implements IPlatformService {
     @Autowired
     private UserSetting userSetting;
 
-    @Autowired
-    private HookSubscribe subscribe;
-
     @Autowired
     private VideoStreamSessionManager streamSession;
 
@@ -437,11 +434,7 @@ public class PlatformServiceImpl implements IPlatformService {
                 ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
                 redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null);
                 MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-                Map<String, Object> param = new HashMap<>(3);
-                param.put("vhost", "__defaultVhost__");
-                param.put("app", sendRtpItem.getApp());
-                param.put("stream", sendRtpItem.getStream());
-                zlmServerFactory.stopSendRtpStream(mediaInfo, param);
+                mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
             }
         }
     }

+ 3 - 8
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java

@@ -1,6 +1,5 @@
 package com.genersoft.iot.vmp.service.impl;
 
-import com.alibaba.fastjson2.JSONObject;
 import com.baomidou.dynamic.datasource.annotation.DS;
 import com.genersoft.iot.vmp.common.*;
 import com.genersoft.iot.vmp.conf.DynamicTask;
@@ -25,7 +24,6 @@ import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
 import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.service.*;
@@ -1421,11 +1419,8 @@ public class PlayServiceImpl implements IPlayService {
         MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
 
         if (mediaInfo == null) {
-            RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
-                    sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
-                    sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
-                    sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
-            redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
+            RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
+            redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
                 startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader);
             });
         } else {
@@ -1433,7 +1428,7 @@ public class PlayServiceImpl implements IPlayService {
                 if (sendRtpItem.isTcpActive()) {
                     mediaServerService.startSendRtpPassive(mediaInfo, platform, sendRtpItem, null);
                 } else {
-                    mediaServerService.startSendRtpStream(mediaInfo, platform, sendRtpItem);
+                    mediaServerService.startSendRtp(mediaInfo, platform, sendRtpItem);
                 }
             }catch (ControllerException e) {
                 logger.error("RTP推流失败: {}", e.getMessage());

+ 2 - 6
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java

@@ -9,15 +9,14 @@ import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.event.hook.Hook;
+import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
 import com.genersoft.iot.vmp.media.event.hook.HookType;
 import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
 import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
 import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
-import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
 import com.genersoft.iot.vmp.service.IGbStreamService;
@@ -64,9 +63,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
     @Autowired
     private IVideoManagerStorage videoManagerStorager;
 
-    @Autowired
-    private ZLMServerFactory zlmServerFactory;
-
     @Autowired
     private StreamProxyMapper streamProxyMapper;
 

+ 17 - 39
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java

@@ -5,12 +5,12 @@ import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.common.VideoManagerConstants;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.event.hook.Hook;
-import com.genersoft.iot.vmp.media.event.hook.HookType;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
-import com.genersoft.iot.vmp.media.bean.MediaServer;
+import com.genersoft.iot.vmp.media.event.hook.HookType;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.*;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -27,7 +27,6 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
 import java.text.ParseException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -72,9 +71,6 @@ public class RedisGbPlayMsgListener implements MessageListener {
     @Autowired
     private RedisTemplate<Object, Object> redisTemplate;
 
-    @Autowired
-    private ZLMServerFactory zlmServerFactory;
-
     @Autowired
     private IMediaServerService mediaServerService;
 
@@ -101,7 +97,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
     }
 
     public interface PlayMsgCallbackForStartSendRtpStream{
-        void handler(JSONObject jsonObject);
+        void handler();
     }
 
     public interface PlayMsgErrorCallback{
@@ -181,11 +177,10 @@ public class RedisGbPlayMsgListener implements MessageListener {
                                     String serial = wvpRedisMsg.getSerial();
                                     switch (wvpResult.getCode()) {
                                         case 0:
-                                            JSONObject jsonObject = (JSONObject)wvpResult.getData();
                                             PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
                                             if (playMsgCallback != null) {
                                                 callbacksForError.remove(serial);
-                                                playMsgCallback.handler(jsonObject);
+                                                playMsgCallback.handler();
                                             }
                                             break;
                                         case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
@@ -219,36 +214,24 @@ public class RedisGbPlayMsgListener implements MessageListener {
      * 处理收到的请求推流的请求
      */
     private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) {
-        MediaServer mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId());
-        if (mediaInfo == null) {
+        MediaServer mediaServer = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId());
+        if (mediaServer == null) {
             // TODO 回复错误
             return;
         }
-        String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1";
-        Map<String, Object> param = new HashMap<>();
-        param.put("vhost","__defaultVhost__");
-        param.put("app",requestPushStreamMsg.getApp());
-        param.put("stream",requestPushStreamMsg.getStream());
-        param.put("ssrc", requestPushStreamMsg.getSsrc());
-        param.put("dst_url",requestPushStreamMsg.getIp());
-        param.put("dst_port", requestPushStreamMsg.getPort());
-        param.put("is_udp", is_Udp);
-        param.put("src_port", requestPushStreamMsg.getSrcPort());
-        param.put("pt", requestPushStreamMsg.getPt());
-        param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0");
-        param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0");
-        JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaInfo, param);
-        // 回复消息
-        responsePushStream(jsonObject, fromId, serial);
-    }
+        SendRtpItem sendRtpItem = SendRtpItem.getInstance(requestPushStreamMsg);
 
-    private void responsePushStream(JSONObject content, String toId, String serial) {
+        try {
+            mediaServerService.startSendRtp(mediaServer, null, sendRtpItem);
+        }catch (ControllerException e) {
+            return;
+        }
 
+        // 回复消息
         WVPResult<JSONObject> result = new WVPResult<>();
         result.setCode(0);
-        result.setData(content);
 
-        WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
+        WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), fromId,
                 WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result));
         JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
         redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
@@ -317,7 +300,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
      * 将获取到的sendItem发送出去
      */
     private void responseSendItem(MediaServer mediaServerItem, RequestSendItemMsg content, String toId, String serial) {
-        SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
+        SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, content.getIp(),
                 content.getPort(), content.getSsrc(), content.getPlatformId(),
                 content.getApp(), content.getStream(), content.getChannelId(),
                 content.getTcp(), content.getRtcp());
@@ -453,13 +436,8 @@ public class RedisGbPlayMsgListener implements MessageListener {
             // TODO 回复错误
             return;
         }
-        Map<String, Object> param = new HashMap<>();
-        param.put("vhost","__defaultVhost__");
-        param.put("app",sendRtpItem.getApp());
-        param.put("stream",sendRtpItem.getStream());
-        param.put("ssrc", sendRtpItem.getSsrc());
 
-        if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) {
+        if (mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc())) {
             logger.info("[REDIS 执行其他平台的请求停止推流] 成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
             // 发送redis消息
             MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,

+ 2 - 13
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java

@@ -8,7 +8,6 @@ import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.IStreamPushService;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@@ -25,7 +24,6 @@ import org.springframework.stereotype.Component;
 import javax.sip.InvalidArgumentException;
 import javax.sip.SipException;
 import java.text.ParseException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -57,9 +55,6 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
     @Autowired
     private IMediaServerService mediaServerService;
 
-    @Autowired
-    private ZLMServerFactory zlmServerFactory;
-
 
     private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
 
@@ -88,16 +83,10 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
                     }
                     if (push.isSelf()) {
                         // 停止向上级推流
-                        String streamId = sendRtpItem.getStream();
-                        Map<String, Object> param = new HashMap<>();
-                        param.put("vhost","__defaultVhost__");
-                        param.put("app",sendRtpItem.getApp());
-                        param.put("stream",streamId);
-                        param.put("ssrc",sendRtpItem.getSsrc());
-                        logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId);
+                        logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", sendRtpItem.getStream());
                         MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                         redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream());
-                        zlmServerFactory.stopSendRtpStream(mediaInfo, param);
+                        mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
                         if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
                             MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                                     sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),

+ 18 - 15
src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java

@@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 import com.genersoft.iot.vmp.utils.redis.RedisUtil;
 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
 import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo;
@@ -81,8 +82,8 @@ public class PsController {
         logger.info("[第三方PS服务对接->开启收流和获取发流信息] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}",
                 isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP被动", callBack);
 
-        MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
-        if (mediaServerItem == null) {
+        MediaServer mediaServer = mediaServerService.getDefaultMediaServer();
+        if (mediaServer == null) {
             throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer");
         }
         if (stream == null) {
@@ -100,13 +101,14 @@ public class PsController {
             }
         }
         String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + callId + "_"  + stream;
-        int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, false, tcpMode);
-        if (localPort == 0) {
+        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, ssrcInt + "", false, false, null, false, false, false, tcpMode);
+
+        if (ssrcInfo.getPort() == 0) {
             throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败");
         }
         // 注册回调如果rtp收流超时则通过回调发送通知
         if (callBack != null) {
-            Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServerItem.getId());
+            Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServer.getId());
             // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
             hookSubscribe.addSubscribe(hook,
                     (hookData)->{
@@ -128,8 +130,8 @@ public class PsController {
                     });
         }
         OtherPsSendInfo otherPsSendInfo = new OtherPsSendInfo();
-        otherPsSendInfo.setReceiveIp(mediaServerItem.getSdpIp());
-        otherPsSendInfo.setReceivePort(localPort);
+        otherPsSendInfo.setReceiveIp(mediaServer.getSdpIp());
+        otherPsSendInfo.setReceivePort(ssrcInfo.getPort());
         otherPsSendInfo.setCallId(callId);
         otherPsSendInfo.setStream(stream);
 
@@ -138,9 +140,9 @@ public class PsController {
         if (isSend != null && isSend) {
             String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_"  + callId;
             // 预创建发流信息
-            int port = sendRtpPortManager.getNextPort(mediaServerItem);
+            int port = sendRtpPortManager.getNextPort(mediaServer);
 
-            otherPsSendInfo.setSendLocalIp(mediaServerItem.getSdpIp());
+            otherPsSendInfo.setSendLocalIp(mediaServer.getSdpIp());
             otherPsSendInfo.setSendLocalPort(port);
             // 将信息写入redis中,以备后用
             redisTemplate.opsForValue().set(key, otherPsSendInfo, 300, TimeUnit.SECONDS);
@@ -156,7 +158,7 @@ public class PsController {
     public void closeRtpServer(String stream) {
         logger.info("[第三方PS服务对接->关闭收流] stream->{}", stream);
         MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
-        zlmServerFactory.closeRtpServer(mediaServerItem,stream);
+        mediaServerService.closeRTPServer(mediaServerItem, stream);
         String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_*_"  + stream;
         List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey);
         if (!scan.isEmpty()) {
@@ -198,7 +200,7 @@ public class PsController {
                         app,
                         stream,
                         callId);
-        MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
+        MediaServer mediaServer = mediaServerService.getDefaultMediaServer();
         String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_"  + callId;
         OtherPsSendInfo sendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(key);
         if (sendInfo == null) {
@@ -224,9 +226,10 @@ public class PsController {
         param.put("src_port", sendInfo.getSendLocalPort());
 
 
-        Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, app, stream);
+        Boolean streamReady = mediaServerService.isStreamReady(mediaServer, app, stream);
         if (streamReady) {
-            JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
+            JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, param);
+//            mediaServerService.startSendRtp(mediaServer, );
             if (jsonObject.getInteger("code") == 0) {
                 logger.info("[第三方PS服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, param);
                 redisTemplate.opsForValue().set(key, sendInfo);
@@ -238,7 +241,7 @@ public class PsController {
         }else {
             logger.info("[第三方PS服务对接->发送流] 流不存在,等待流上线,callId->{}", callId);
             String uuid = UUID.randomUUID().toString();
-            Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServerItem.getId());
+            Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServer.getId());
             dynamicTask.startDelay(uuid, ()->{
                 logger.info("[第三方PS服务对接->发送流] 等待流上线超时 callId->{}", callId);
                 redisTemplate.delete(key);
@@ -257,7 +260,7 @@ public class PsController {
                         } catch (InterruptedException e) {
                             throw new RuntimeException(e);
                         }
-                        JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
+                        JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, param);
                         if (jsonObject.getInteger("code") == 0) {
                             logger.info("[第三方PS服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, param);
                             redisTemplate.opsForValue().set(key, finalSendInfo);