648540858 2 лет назад
Родитель
Сommit
6b03568c5d

+ 13 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java

@@ -1,7 +1,5 @@
 package com.genersoft.iot.vmp.gb28181.bean;
 
-import gov.nist.javax.sip.message.SIPRequest;
-
 public class SendRtpItem {
 
     /**
@@ -108,6 +106,11 @@ public class SendRtpItem {
      */
     private boolean onlyAudio = false;
 
+    /**
+     * 是否开启rtcp保活
+     */
+    private boolean rtcp = false;
+
 
     /**
      * 播放类型
@@ -281,4 +284,12 @@ public class SendRtpItem {
     public void setToTag(String toTag) {
         this.toTag = toTag;
     }
+
+    public boolean isRtcp() {
+        return rtcp;
+    }
+
+    public void setRtcp(boolean rtcp) {
+        this.rtcp = rtcp;
+    }
 }

+ 8 - 7
src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java

@@ -1,8 +1,6 @@
 package com.genersoft.iot.vmp.gb28181.session;
 
-import java.util.ArrayList;
-import java.util.List;
-
+import com.alibaba.fastjson2.JSON;
 import com.genersoft.iot.vmp.common.VideoManagerConstants;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
@@ -13,6 +11,9 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.ObjectUtils;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**    
  * @description:视频流session管理器,管理视频预览、预览回放的通信句柄 
  * @author: swwheihei
@@ -27,7 +28,8 @@ public class VideoStreamSessionManager {
 	public enum SessionType {
 		play,
 		playback,
-		download
+		download,
+		broadcast
 	}
 
 	/**
@@ -50,9 +52,8 @@ public class VideoStreamSessionManager {
 		ssrcTransaction.setSsrc(ssrc);
 		ssrcTransaction.setMediaServerId(mediaServerId);
 		ssrcTransaction.setType(type);
-
-		RedisUtil.set(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId()
-				+ "_" +  deviceId + "_" + channelId + "_" + callId + "_" + stream, ssrcTransaction);
+		System.out.println(22222);
+		System.out.println(JSON.toJSONString(ssrcTransaction));
 		RedisUtil.set(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId()
 				+ "_" +  deviceId + "_" + channelId + "_" + callId + "_" + stream, ssrcTransaction);
 	}

+ 3 - 3
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

@@ -646,7 +646,7 @@ public class SIPCommander implements ISIPCommander {
      * 视频流停止, 不使用回调
      */
     @Override
-    public synchronized void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException {
+    public void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException {
         streamByeCmd(device, channelId, stream, callId, null);
     }
 
@@ -654,7 +654,7 @@ public class SIPCommander implements ISIPCommander {
      * 视频流停止
      */
     @Override
-    public synchronized void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
+    public void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
         SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callId, stream);
         if (ssrcTransaction == null) {
             throw new SsrcTransactionNotFoundException(device.getDeviceId(), channelId, callId, stream);
@@ -669,7 +669,7 @@ public class SIPCommander implements ISIPCommander {
     }
 
     @Override
-    public synchronized void streamByeCmd(Device device, String channelId, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
+    public void streamByeCmd(Device device, String channelId, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
         Request byteRequest = headerProvider.createByteRequest(device, channelId, sipTransactionInfo);
         sipSender.transmitRequest(device.getTransport(), byteRequest, null, okEvent);
     }

+ 9 - 9
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java

@@ -4,10 +4,10 @@ import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
-import com.genersoft.iot.vmp.gb28181.bean.*;
-import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
+import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@@ -22,15 +22,12 @@ import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
 import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import gov.nist.javax.sip.message.SIPRequest;
-import gov.nist.javax.sip.stack.SIPDialog;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import javax.sip.*;
 import javax.sip.InvalidArgumentException;
 import javax.sip.RequestEvent;
 import javax.sip.SipException;
@@ -38,7 +35,6 @@ import javax.sip.address.SipURI;
 import javax.sip.header.CallIdHeader;
 import javax.sip.header.FromHeader;
 import javax.sip.header.HeaderAddress;
-import java.text.ParseException;
 import javax.sip.header.ToHeader;
 import java.text.ParseException;
 import java.util.HashMap;
@@ -122,7 +118,8 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 		}
 		String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
 		MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-		logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc());
+		logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(),
+				sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
 		Map<String, Object> param = new HashMap<>(12);
 		param.put("vhost","__defaultVhost__");
 		param.put("app",sendRtpItem.getApp());
@@ -132,9 +129,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 		param.put("pt", sendRtpItem.getPt());
 		param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
 		param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
-		if (!sendRtpItem.isTcp() && parentPlatform != null && parentPlatform.isRtcp()) {
+		if (!sendRtpItem.isTcp()) {
 			// 开启rtcp保活
-			param.put("udp_rtcp_timeout", "1");
+			param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
 		}
 
 		JSONObject jsonObject;
@@ -145,6 +142,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 			param.put("dst_url", sendRtpItem.getIp());
 			param.put("dst_port", sendRtpItem.getPort());
 			jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
+			System.out.println(JSON.toJSONString(param));
+			System.out.println();
+			System.out.println(jsonObject);
 		}
 
 			if (jsonObject == null) {

+ 21 - 9
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -126,6 +126,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
 	@Autowired
 	private SipConfig config;
 
+    @Autowired
+    private VideoStreamSessionManager streamSession;
+
 
 
     @Autowired
@@ -383,7 +386,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                     }
                     SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                             device.getDeviceId(), channelId,
-                            mediaTransmissionTCP);
+                            mediaTransmissionTCP, platform.isRtcp());
 
                     if (tcpActive != null) {
                         sendRtpItem.setTcpActive(tcpActive);
@@ -579,7 +582,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                 // 自平台内容
                 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                         gbStream.getApp(), gbStream.getStream(), channelId,
-                        mediaTransmissionTCP);
+                        mediaTransmissionTCP, platform.isRtcp());
 
                 if (sendRtpItem == null) {
                     logger.warn("服务器端口资源不足");
@@ -619,7 +622,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                 // 自平台内容
                 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                         gbStream.getApp(), gbStream.getStream(), channelId,
-                        mediaTransmissionTCP);
+                        mediaTransmissionTCP, platform.isRtcp());
 
                 if (sendRtpItem == null) {
                     logger.warn("服务器端口资源不足");
@@ -736,7 +739,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                 dynamicTask.stop(callIdHeader.getCallId());
                 if (serverId.equals(userSetting.getServerId())) {
                     SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
-                            app, stream, channelId, mediaTransmissionTCP);
+                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
 
                     if (sendRtpItem == null) {
                         logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");
@@ -798,7 +801,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
         // 发送redis消息
         redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(),
                 streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId,
-                channelId, mediaTransmissionTCP, null, responseSendItemMsg -> {
+                channelId, mediaTransmissionTCP, platform.isRtcp(), null, responseSendItemMsg -> {
                     SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem();
                     if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) {
                         logger.warn("服务器端口资源不足");
@@ -904,6 +907,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
         }
         if (device != null) {
             logger.info("收到设备" + requesterId + "的语音广播Invite请求");
+
             try {
                 responseAck(request, Response.TRYING);
             } catch (SipException | InvalidArgumentException | ParseException e) {
@@ -980,7 +984,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                 }
                 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                         device.getDeviceId(), audioBroadcastCatch.getChannelId(),
-                        mediaTransmissionTCP);
+                        mediaTransmissionTCP, false);
+
                 if (sendRtpItem == null) {
                     logger.warn("服务器端口资源不足");
                     try {
@@ -1006,12 +1011,16 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                 sendRtpItem.setStreamId(stream);
                 sendRtpItem.setPt(8);
                 sendRtpItem.setUsePs(false);
+                sendRtpItem.setRtcp(false);
                 sendRtpItem.setOnlyAudio(true);
                 redisCatchStorage.updateSendRTPSever(sendRtpItem);
 
                 Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream);
                 if (streamReady) {
-                    sendOk(device, sendRtpItem, sdp, request, mediaServerItem, mediaTransmissionTCP, ssrc);
+                    SIPResponse sipResponse = sendOk(device, sendRtpItem, sdp, request, mediaServerItem, mediaTransmissionTCP, ssrc);
+                    // 添加事务信息
+                    streamSession.put(device.getDeviceId(), audioBroadcastCatch.getChannelId(), request.getCallIdHeader().getCallId()
+                            , stream,  sendRtpItem.getSsrc(), mediaServerItem.getId(), sipResponse, VideoStreamSessionManager.SessionType.broadcast );
                 }else {
                     logger.warn("[语音通话], 未发现待推送的流,app={},stream={}", app, stream);
                     playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId());
@@ -1029,7 +1038,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
         }
     }
 
-    void sendOk(Device device, SendRtpItem sendRtpItem, SessionDescription sdp, SIPRequest request,  MediaServerItem mediaServerItem, boolean mediaTransmissionTCP, String ssrc){
+    SIPResponse sendOk(Device device, SendRtpItem sendRtpItem, SessionDescription sdp, SIPRequest request,  MediaServerItem mediaServerItem, boolean mediaTransmissionTCP, String ssrc){
+        SIPResponse sipResponse = null;
         try {
             sendRtpItem.setStatus(2);
             redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -1065,15 +1075,17 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
             parentPlatform.setServerPort(device.getPort());
             parentPlatform.setServerGBId(device.getDeviceId());
 
-            SIPResponse sipResponse = responseSdpAck(request, content.toString(), parentPlatform);
+            sipResponse = responseSdpAck(request, content.toString(), parentPlatform);
 
             AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), sendRtpItem.getChannelId());
 
             audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok);
             audioBroadcastCatch.setSipTransactionInfoByRequset(sipResponse);
             audioBroadcastManager.update(audioBroadcastCatch);
+
         } catch (SipException | InvalidArgumentException | ParseException | SdpParseException e) {
             logger.error("[命令发送失败] 语音对讲 回复200OK(SDP): {}", e.getMessage());
         }
+        return sipResponse;
     }
 }

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -531,7 +531,7 @@ public class ZLMHttpHookListener {
 //											cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
 										}
 									}else {
-										cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
+										cmder.streamByeCmd(device, null, null, sendRtpItem.getCallId());
 									}
 
 								}
@@ -771,7 +771,7 @@ public class ZLMHttpHookListener {
 	@ResponseBody
 	@PostMapping(value = "/on_rtp_server_timeout", produces = "application/json;charset=UTF-8")
 	public JSONObject onRtpServerTimeout(HttpServletRequest request, @RequestBody OnRtpServerTimeoutHookParam param){
-		logger.info("[ZLM HOOK] rtpServer收流超时:{}->{}({})", param.getMediaServerId(), param.getStream_id(), param.getSsrc());
+		logger.info("[ZLM HOOK] rtpServer rtp超时:{}->{}({})", param.getMediaServerId(), param.getStream_id(), param.getSsrc());
 
 		JSONObject ret = new JSONObject();
 		ret.put("code", 0);

+ 4 - 2
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java

@@ -177,7 +177,7 @@ public class ZLMRTPServerFactory {
      * @param tcp 是否为tcp
      * @return SendRtpItem
      */
-    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp){
+    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp, boolean rtcp){
 
         // 默认为随机端口
         int localPort = 0;
@@ -197,6 +197,7 @@ public class ZLMRTPServerFactory {
         sendRtpItem.setDeviceId(deviceId);
         sendRtpItem.setChannelId(channelId);
         sendRtpItem.setTcp(tcp);
+        sendRtpItem.setRtcp(rtcp);
         sendRtpItem.setApp("rtp");
         sendRtpItem.setLocalPort(localPort);
         sendRtpItem.setServerId(userSetting.getServerId());
@@ -214,7 +215,7 @@ public class ZLMRTPServerFactory {
      * @param tcp 是否为tcp
      * @return SendRtpItem
      */
-    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp){
+    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp, boolean rtcp){
         // 默认为随机端口
         int localPort = 0;
         if (userSetting.getGbSendStreamStrict()) {
@@ -235,6 +236,7 @@ public class ZLMRTPServerFactory {
         sendRtpItem.setLocalPort(localPort);
         sendRtpItem.setServerId(userSetting.getServerId());
         sendRtpItem.setMediaServerId(serverItem.getId());
+        sendRtpItem.setRtcp(rtcp);
         return sendRtpItem;
     }
 

+ 16 - 1
src/main/java/com/genersoft/iot/vmp/service/bean/RequestSendItemMsg.java

@@ -63,10 +63,16 @@ public class RequestSendItemMsg {
     private Boolean isTcp;
 
 
+    /**
+     * 是否使用TCP
+     */
+    private Boolean rtcp;
+
+
 
 
     public static RequestSendItemMsg getInstance(String serverId, String mediaServerId, String app, String stream, String ip, int port,
-                                                          String ssrc, String platformId, String channelId, Boolean isTcp, String platformName) {
+                                                          String ssrc, String platformId, String channelId, Boolean isTcp, Boolean rtcp, String platformName) {
         RequestSendItemMsg requestSendItemMsg = new RequestSendItemMsg();
         requestSendItemMsg.setServerId(serverId);
         requestSendItemMsg.setMediaServerId(mediaServerId);
@@ -79,6 +85,7 @@ public class RequestSendItemMsg {
         requestSendItemMsg.setPlatformName(platformName);
         requestSendItemMsg.setChannelId(channelId);
         requestSendItemMsg.setTcp(isTcp);
+        requestSendItemMsg.setRtcp(rtcp);
 
         return  requestSendItemMsg;
     }
@@ -170,4 +177,12 @@ public class RequestSendItemMsg {
     public void setTcp(Boolean tcp) {
         isTcp = tcp;
     }
+
+    public Boolean getRtcp() {
+        return rtcp;
+    }
+
+    public void setRtcp(Boolean rtcp) {
+        this.rtcp = rtcp;
+    }
 }

+ 2 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java

@@ -347,7 +347,7 @@ public class PlayServiceImpl implements IPlayService {
 //                        }
                         SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, ip, port, ssrcInfo.getSsrc(), device.getDeviceId(),
                                 device.getDeviceId(), channelId,
-                                false);
+                                false, false);
 
 
 //                        if (sendRtpItem.getLocalPort() == 0) {
@@ -375,6 +375,7 @@ public class PlayServiceImpl implements IPlayService {
                         sendRtpItem.setStreamId("1000");
                         sendRtpItem.setSsrc(ssrc);
                         sendRtpItem.setOnlyAudio(true);
+                        sendRtpItem.setRtcp(false);
                         redisCatchStorage.updateSendRTPSever(sendRtpItem);
 
                         Map<String, Object> param = new HashMap<>(12);

+ 3 - 3
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java

@@ -318,7 +318,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
         SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
                 content.getPort(), content.getSsrc(), content.getPlatformId(),
                 content.getApp(), content.getStream(), content.getChannelId(),
-                content.getTcp());
+                content.getTcp(), content.getRtcp());
 
         WVPResult<ResponseSendItemMsg> result = new WVPResult<>();
         result.setCode(0);
@@ -348,9 +348,9 @@ public class RedisGbPlayMsgListener implements MessageListener {
      * @param callback 得到信息的回调
      */
     public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc,
-                        String platformId, String channelId, boolean isTcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) {
+                        String platformId, String channelId, boolean isTcp, boolean rtcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) {
         RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance(
-                serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, platformName);
+                serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, rtcp, platformName);
         requestSendItemMsg.setServerId(serverId);
         String key = UUID.randomUUID().toString();
         WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM,