Преглед изворни кода

优化端口预占用,防止占用无法释放

648540858 пре 2 година
родитељ
комит
a77628e875

+ 46 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java

@@ -0,0 +1,46 @@
+package com.genersoft.iot.vmp.gb28181.bean;
+
+import javax.sdp.SessionDescription;
+
+/**
+ * 28181 的SDP解析器
+ */
+public class Gb28181Sdp  {
+    private SessionDescription baseSdb;
+    private String ssrc;
+
+    private String mediaDescription;
+
+    public static Gb28181Sdp getInstance(SessionDescription baseSdb, String ssrc, String mediaDescription) {
+        Gb28181Sdp gb28181Sdp = new Gb28181Sdp();
+        gb28181Sdp.setBaseSdb(baseSdb);
+        gb28181Sdp.setSsrc(ssrc);
+        gb28181Sdp.setMediaDescription(mediaDescription);
+        return gb28181Sdp;
+    }
+
+
+    public SessionDescription getBaseSdb() {
+        return baseSdb;
+    }
+
+    public void setBaseSdb(SessionDescription baseSdb) {
+        this.baseSdb = baseSdb;
+    }
+
+    public String getSsrc() {
+        return ssrc;
+    }
+
+    public void setSsrc(String ssrc) {
+        this.ssrc = ssrc;
+    }
+
+    public String getMediaDescription() {
+        return mediaDescription;
+    }
+
+    public void setMediaDescription(String mediaDescription) {
+        this.mediaDescription = mediaDescription;
+    }
+}

+ 45 - 31
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -241,21 +241,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                 // 解析sdp消息, 使用jainsip 自带的sdp解析方式
                 String contentString = new String(request.getRawContent());
 
-                // jainSip不支持y=字段, 移除以解析。
-                int ssrcIndex = contentString.indexOf("y=");
-                // 检查是否有y字段
-                String ssrcDefault = "0000000000";
-                String ssrc;
-                SessionDescription sdp;
-                if (ssrcIndex >= 0) {
-                    //ssrc规定长度为10个字节,不取余下长度以避免后续还有“f=”字段
-                    ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
-                    String substring = contentString.substring(0, contentString.indexOf("y="));
-                    sdp = SdpFactory.getInstance().createSessionDescription(substring);
-                } else {
-                    ssrc = ssrcDefault;
-                    sdp = SdpFactory.getInstance().createSessionDescription(contentString);
-                }
+                Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString);
+                SessionDescription sdp = gb28181Sdp.getBaseSdb();
                 String sessionName = sdp.getSessionName().getValue();
 
                 Long startTime = null;
@@ -317,7 +304,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                 String username = sdp.getOrigin().getUsername();
                 String addressStr = sdp.getConnection().getAddress();
 
-                logger.info("[上级点播]用户:{}, 通道:{}, 地址:{}:{}, ssrc:{}", username, channelId, addressStr, port, ssrc);
+
                 Device device = null;
                 // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
                 if (channel != null) {
@@ -341,8 +328,30 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                         }
                         return;
                     }
+
+                    String ssrc;
+                    if (gb28181Sdp.getSsrc() == null) {
+                        // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
+                        ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
+                        logger.warn("[上级Invite] {} 平台:{}, 通道:{}, 缺少 ssrc,补充为: {}", sessionName, username, channelId, ssrc);
+                    }else {
+                        ssrc = gb28181Sdp.getSsrc();
+                    }
+                    String streamTypeStr = null;
+                    if (mediaTransmissionTCP) {
+                        if (tcpActive) {
+                            streamTypeStr = "TCP-ACTIVE";
+                        }else {
+                            streamTypeStr = "TCP-PASSIVE";
+                        }
+                    }else {
+                        streamTypeStr = "UDP";
+                    }
+                    logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
                     SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
-                            device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
+                            device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> {
+                                return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
+                            });
 
                     if (tcpActive != null) {
                         sendRtpItem.setTcpActive(tcpActive);
@@ -469,7 +478,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                             SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());
                             logger.info(JSONObject.toJSONString(ssrcInfo));
                             sendRtpItem.setStreamId(ssrcInfo.getStream());
-                            sendRtpItem.setSsrc(ssrc.equals(ssrcDefault) ? ssrcInfo.getSsrc() : ssrc);
+                            sendRtpItem.setSsrc(ssrc);
 
                             // 写入redis, 超时时回复
                             redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -480,12 +489,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                             });
                         } else {
                             // 当前系统作为下级平台使用,当上级平台点播时不携带ssrc时,并且设备在当前系统中已经点播了。这个时候需要重新给生成一个ssrc,不使用默认的"0000000000"。
-                            if (ssrc.equals(ssrcDefault)) {
-                                ssrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId());
-                                ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
-                                sendRtpItem.setSsrc(ssrc);
-                            }
-
+                            sendRtpItem.setSsrc(ssrc);
                             sendRtpItem.setStreamId(playTransaction.getStream());
                             // 写入redis, 超时时回复
                             redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -496,11 +500,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                         }
                     }
                 } else if (gbStream != null) {
-                    if(ssrc.equals(ssrcDefault))
-                    {
-                        ssrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId());
-                        ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
+
+                    String ssrc;
+                    if (gb28181Sdp.getSsrc() == null) {
+                        // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
+                        ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
+                    }else {
+                        ssrc = gb28181Sdp.getSsrc();
                     }
+
                     if("push".equals(gbStream.getStreamType())) {
                         if (streamPushItem != null && streamPushItem.isPushIng()) {
                             // 推流状态
@@ -545,7 +553,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
             if (streamReady) {
                 // 自平台内容
                 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
-                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
+                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{
+                            return  redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
+                        });
 
                 if (sendRtpItem == null) {
                     logger.warn("服务器端口资源不足");
@@ -584,7 +594,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
             if (streamReady) {
                 // 自平台内容
                 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
-                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
+                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{
+                            return  redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
+                        });
 
                 if (sendRtpItem == null) {
                     logger.warn("服务器端口资源不足");
@@ -701,7 +713,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                 dynamicTask.stop(callIdHeader.getCallId());
                 if (serverId.equals(userSetting.getServerId())) {
                     SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
-                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
+                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> {
+                                return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
+                            });
 
                     if (sendRtpItem == null) {
                         logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");

+ 82 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java

@@ -1,14 +1,22 @@
 package com.genersoft.iot.vmp.gb28181.utils;
 
 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
+import com.genersoft.iot.vmp.gb28181.bean.Gb28181Sdp;
 import com.genersoft.iot.vmp.gb28181.bean.RemoteAddressInfo;
+import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.utils.GitUtil;
 import gov.nist.javax.sip.address.AddressImpl;
 import gov.nist.javax.sip.address.SipUri;
 import gov.nist.javax.sip.header.Subject;
 import gov.nist.javax.sip.message.SIPRequest;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.util.ObjectUtils;
 
+import javax.sdp.SdpFactory;
+import javax.sdp.SdpParseException;
+import javax.sdp.SessionDescription;
 import javax.sip.PeerUnavailableException;
 import javax.sip.SipFactory;
 import javax.sip.header.FromHeader;
@@ -16,6 +24,8 @@ import javax.sip.header.Header;
 import javax.sip.header.UserAgentHeader;
 import javax.sip.message.Request;
 import java.text.ParseException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeParseException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -28,6 +38,8 @@ import java.util.UUID;
  */
 public class SipUtils {
 
+    private final static Logger logger = LoggerFactory.getLogger(SipUtils.class);
+
     public static String getUserIdFromFromHeader(Request request) {
         FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
         return getUserIdFromFromHeader(fromHeader);
@@ -51,7 +63,7 @@ public class SipUtils {
     }
 
     public static  String getNewViaTag() {
-        return "z9hG4bK" + System.currentTimeMillis();
+        return "z9hG4bK" + RandomStringUtils.randomNumeric(10);
     }
 
     public static UserAgentHeader createUserAgentHeader(GitUtil gitUtil) throws PeerUnavailableException, ParseException {
@@ -113,6 +125,12 @@ public class SipUtils {
         strTmp = String.format("%02X", moveSpeed);
         builder.append(strTmp, 0, 2);
         builder.append(strTmp, 0, 2);
+
+        //优化zoom低倍速下的变倍速率
+        if ((zoomSpeed > 0) && (zoomSpeed <16))
+        {
+            zoomSpeed = 16;
+        }
         strTmp = String.format("%X", zoomSpeed);
         builder.append(strTmp, 0, 1).append("0");
         //计算校验码
@@ -183,4 +201,66 @@ public class SipUtils {
         }
         return deviceChannel;
     }
-}
+
+    public static Gb28181Sdp parseSDP(String sdpStr) throws SdpParseException {
+
+        // jainSip不支持y= f=字段, 移除以解析。
+        int ssrcIndex = sdpStr.indexOf("y=");
+        int mediaDescriptionIndex = sdpStr.indexOf("f=");
+        // 检查是否有y字段
+        SessionDescription sdp;
+        String ssrc = null;
+        String mediaDescription = null;
+        if (mediaDescriptionIndex == 0 && ssrcIndex == 0) {
+            sdp = SdpFactory.getInstance().createSessionDescription(sdpStr);
+        }else {
+            String lines[] = sdpStr.split("\\r?\\n");
+            StringBuilder sdpBuffer = new StringBuilder();
+            for (String line : lines) {
+                if (line.trim().startsWith("y=")) {
+                    ssrc = line.substring(2);
+                }else if (line.trim().startsWith("f=")) {
+                    mediaDescription = line.substring(2);
+                }else {
+                    sdpBuffer.append(line.trim()).append("\r\n");
+                }
+            }
+            sdp = SdpFactory.getInstance().createSessionDescription(sdpBuffer.toString());
+        }
+        return Gb28181Sdp.getInstance(sdp, ssrc, mediaDescription);
+    }
+
+    public static String getSsrcFromSdp(String sdpStr) {
+
+        // jainSip不支持y= f=字段, 移除以解析。
+        int ssrcIndex = sdpStr.indexOf("y=");
+        if (ssrcIndex == 0) {
+            return null;
+        }
+        String lines[] = sdpStr.split("\\r?\\n");
+        for (String line : lines) {
+            if (line.trim().startsWith("y=")) {
+                return line.substring(2);
+            }
+        }
+        return null;
+    }
+
+    public static String parseTime(String timeStr) {
+        if (ObjectUtils.isEmpty(timeStr)){
+            return null;
+        }
+        LocalDateTime localDateTime;
+        try {
+            localDateTime = LocalDateTime.parse(timeStr);
+        }catch (DateTimeParseException e) {
+            try {
+                localDateTime = LocalDateTime.parse(timeStr, DateUtil.formatterISO8601);
+            }catch (DateTimeParseException e2) {
+                logger.error("[格式化时间] 无法格式化时间: {}", timeStr);
+                return null;
+            }
+        }
+        return localDateTime.format(DateUtil.formatterISO8601);
+    }
+}

+ 25 - 9
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java

@@ -219,13 +219,14 @@ public class ZLMRTPServerFactory {
      * @param tcp 是否为tcp
      * @return SendRtpItem
      */
-    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp, boolean rtcp){
+    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId,
+                                         String deviceId, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){
 
         // 默认为随机端口
         int localPort = 0;
         if (userSetting.getGbSendStreamStrict()) {
             if (userSetting.getGbSendStreamStrict()) {
-                localPort = keepPort(serverItem, ssrc);
+                localPort = keepPort(serverItem, ssrc, localPort, callback);
                 if (localPort == 0) {
                     return null;
                 }
@@ -257,11 +258,12 @@ public class ZLMRTPServerFactory {
      * @param tcp 是否为tcp
      * @return SendRtpItem
      */
-    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp, boolean rtcp){
+    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId,
+                                         String app, String stream, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){
         // 默认为随机端口
         int localPort = 0;
         if (userSetting.getGbSendStreamStrict()) {
-            localPort = keepPort(serverItem, ssrc);
+            localPort = keepPort(serverItem, ssrc, localPort, callback);
             if (localPort == 0) {
                 return null;
             }
@@ -282,13 +284,16 @@ public class ZLMRTPServerFactory {
         return sendRtpItem;
     }
 
+    public interface KeepPortCallback{
+        Boolean keep(String ssrc);
+    }
+
     /**
      * 保持端口,直到需要需要发流时再释放
      */
-    public int keepPort(MediaServerItem serverItem, String ssrc) {
-        int localPort = 0;
+    public int keepPort(MediaServerItem serverItem, String ssrc, int localPort, KeepPortCallback keepPortCallback) {
         Map<String, Object> param = new HashMap<>(3);
-        param.put("port", 0);
+        param.put("port", localPort);
         param.put("enable_tcp", 1);
         param.put("stream_id", ssrc);
         JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param);
@@ -296,10 +301,21 @@ public class ZLMRTPServerFactory {
             localPort = jsonObject.getInteger("port");
             HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
             // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
+            Integer finalLocalPort = localPort;
             hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
                     (MediaServerItem mediaServerItem, JSONObject response)->{
-                        logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc);
-                        keepPort(serverItem, ssrc);
+                        System.out.println("监听端口到期继续保持监听");
+                        System.out.println(response);
+                        if (ssrc.equals(response.getString("stream_id"))) {
+                            if (keepPortCallback.keep(ssrc)) {
+                                logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc);
+                                keepPort(serverItem, ssrc, finalLocalPort, keepPortCallback);
+                            }else {
+                                logger.info("[上级点播] {}->发送取消,无需继续监听", ssrc);
+                                releasePort(serverItem, ssrc);
+                            }
+                        }
+
                     });
             logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort);
         }else {

+ 32 - 1
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java

@@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.*;
+import com.genersoft.iot.vmp.utils.redis.RedisUtil;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,6 +27,7 @@ import org.springframework.stereotype.Component;
 
 import java.text.ParseException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -314,7 +316,9 @@ public class RedisGbPlayMsgListener implements MessageListener {
         SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
                 content.getPort(), content.getSsrc(), content.getPlatformId(),
                 content.getApp(), content.getStream(), content.getChannelId(),
-                content.getTcp(), content.getRtcp());
+                content.getTcp(), content.getRtcp(), ssrcFromCallback -> {
+                    return querySendRTPServer(content.getPlatformId(), content.getChannelId(), content.getStream(), null) != null;
+                });
 
         WVPResult<ResponseSendItemMsg> result = new WVPResult<>();
         result.setCode(0);
@@ -391,4 +395,31 @@ public class RedisGbPlayMsgListener implements MessageListener {
         });
         redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
     }
+
+    private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
+        if (platformGbId == null) {
+            platformGbId = "*";
+        }
+        if (channelId == null) {
+            channelId = "*";
+        }
+        if (streamId == null) {
+            streamId = "*";
+        }
+        if (callId == null) {
+            callId = "*";
+        }
+        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
+                + userSetting.getServerId() + "_*_"
+                + platformGbId + "_"
+                + channelId + "_"
+                + streamId + "_"
+                + callId;
+        List<Object> scan = RedisUtil.scan(redisTemplate, key);
+        if (scan.size() > 0) {
+            return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0));
+        }else {
+            return null;
+        }
+    }
 }