648540858 3 лет назад
Родитель
Сommit
2466a24860

+ 1 - 1
pom.xml

@@ -11,7 +11,7 @@
 
 	<groupId>com.genersoft</groupId>
 	<artifactId>wvp-pro</artifactId>
-	<version>2.3.2</version>
+	<version>2.6.6</version>
 	<name>web video platform</name>
 	<description>国标28181视频平台</description>
 

+ 5 - 6
src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java

@@ -1,10 +1,7 @@
 package com.genersoft.iot.vmp.conf;
 
-import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Bean;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 import org.springframework.stereotype.Component;
@@ -101,12 +98,14 @@ public class DynamicTask {
         }
     }
 
-    public void stop(String key) {
-        if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) {
-            futureMap.get(key).cancel(false);
+    public boolean stop(String key) {
+        boolean result = false;
+        if (futureMap.get(key) != null && !futureMap.get(key).isCancelled() && !futureMap.get(key).isDone()) {
+            result = futureMap.get(key).cancel(false);
             futureMap.remove(key);
             runnableMap.remove(key);
         }
+        return result;
     }
 
     public boolean contains(String key) {

+ 1 - 12
src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java

@@ -2,13 +2,11 @@ package com.genersoft.iot.vmp.conf;
 
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.utils.DateUtil;
-import com.genersoft.iot.vmp.vmanager.gb28181.device.DeviceQuery;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.util.ObjectUtils;
-import org.springframework.util.StringUtils;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -75,10 +73,6 @@ public class MediaConfig{
     @Value("${media.rtp.port-range}")
     private String rtpPortRange;
 
-
-    @Value("${media.rtp.send-port-range}")
-    private String sendRtpPortRange;
-
     @Value("${media.record-assist-port:0}")
     private Integer recordAssistPort = 0;
 
@@ -191,10 +185,6 @@ public class MediaConfig{
         return sipDomain;
     }
 
-    public String getSendRtpPortRange() {
-        return sendRtpPortRange;
-    }
-
     public MediaServerItem getMediaSerItem(){
         MediaServerItem mediaServerItem = new MediaServerItem();
         mediaServerItem.setId(id);
@@ -214,9 +204,8 @@ public class MediaConfig{
         mediaServerItem.setSecret(secret);
         mediaServerItem.setRtpEnable(rtpEnable);
         mediaServerItem.setRtpPortRange(rtpPortRange);
-        mediaServerItem.setSendRtpPortRange(sendRtpPortRange);
         mediaServerItem.setRecordAssistPort(recordAssistPort);
-        mediaServerItem.setHookAliveInterval(120);
+        mediaServerItem.setHookAliveInterval(30.00f);
 
         mediaServerItem.setCreateTime(DateUtil.getNow());
         mediaServerItem.setUpdateTime(DateUtil.getNow());

+ 5 - 4
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -8,14 +8,13 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.*;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IPlayService;
@@ -39,9 +38,10 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import javax.sdp.*;
-import javax.sip.*;
+import javax.sip.InvalidArgumentException;
+import javax.sip.RequestEvent;
+import javax.sip.SipException;
 import javax.sip.header.CallIdHeader;
-import javax.sip.message.Request;
 import javax.sip.message.Response;
 import java.text.ParseException;
 import java.time.Instant;
@@ -656,6 +656,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
             if (!platform.isStartOfflinePush()) {
                 // 平台设置中关闭了拉起离线的推流则直接回复
                 try {
+                    logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream());
                     responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
                 } catch (SipException | InvalidArgumentException | ParseException e) {
                     logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java

@@ -85,7 +85,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
             String password = (device != null && !ObjectUtils.isEmpty(device.getPassword()))? device.getPassword() : sipConfig.getPassword();
             AuthorizationHeader authHead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME);
             if (authHead == null && !ObjectUtils.isEmpty(password)) {
-                logger.info("[注册请求] 未携带授权头 回复401: {}", requestAddress);
+                logger.info("[注册请求] 回复401: {}", requestAddress);
                 response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request);
                 new DigestServerAuthenticationHelper().generateChallenge(getHeaderFactory(), response, sipConfig.getDomain());
                 sipSender.transmitRequest(response);

+ 13 - 14
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -1,19 +1,19 @@
 package com.genersoft.iot.vmp.media.zlm;
 
-import java.text.ParseException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
-import com.genersoft.iot.vmp.media.zlm.dto.*;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -24,18 +24,15 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.util.ObjectUtils;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.ResponseBody;
-import org.springframework.web.bind.annotation.RestController;
-
-import com.alibaba.fastjson2.JSONObject;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
+import org.springframework.web.bind.annotation.*;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.sip.InvalidArgumentException;
 import javax.sip.SipException;
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**    
  * @description:针对 ZLMediaServer的hook事件监听
@@ -571,6 +568,8 @@ public class ZLMHttpHookListener {
 	public JSONObject onServerStarted(HttpServletRequest request, @RequestBody JSONObject jsonObject){
 
 		jsonObject.put("ip", request.getRemoteAddr());
+		System.out.println(jsonObject.toJSONString()
+		);
 		ZLMServerConfig zlmServerConfig = JSON.to(ZLMServerConfig.class, jsonObject);
 		zlmServerConfig.setIp(request.getRemoteAddr());
 		logger.info("[ZLM HOOK] zlm 启动 " + zlmServerConfig.getGeneralMediaServerId());

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java

@@ -10,7 +10,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.ObjectUtils;
-import org.springframework.util.StringUtils;
 
 import java.util.*;
 
@@ -294,7 +293,8 @@ public class ZLMRTPServerFactory {
      */
     public Boolean isStreamReady(MediaServerItem mediaServerItem, String app, String streamId) {
         JSONObject mediaInfo = zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId);
-        return (mediaInfo.getInteger("code") == 0
+        return mediaInfo != null && (mediaInfo.getInteger("code") == 0
+
                 && mediaInfo.getJSONArray("data") != null
                 && mediaInfo.getJSONArray("data").size() > 0);
     }

+ 3 - 3
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java

@@ -66,7 +66,7 @@ public class ZLMServerConfig {
     private String hookAdminParams;
 
     @JSONField(name = "hook.alive_interval")
-    private int hookAliveInterval;
+    private Float hookAliveInterval;
 
     @JSONField(name = "hook.enable")
     private String hookEnable;
@@ -798,11 +798,11 @@ public class ZLMServerConfig {
         this.shellPhell = shellPhell;
     }
 
-    public int getHookAliveInterval() {
+    public Float getHookAliveInterval() {
         return hookAliveInterval;
     }
 
-    public void setHookAliveInterval(int hookAliveInterval) {
+    public void setHookAliveInterval(Float hookAliveInterval) {
         this.hookAliveInterval = hookAliveInterval;
     }
 

+ 3 - 4
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java

@@ -5,7 +5,6 @@ import com.genersoft.iot.vmp.gb28181.session.SsrcConfig;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
 import io.swagger.v3.oas.annotations.media.Schema;
 import org.springframework.util.ObjectUtils;
-import org.springframework.util.StringUtils;
 
 import java.util.HashMap;
 
@@ -55,7 +54,7 @@ public class MediaServerItem{
     private String secret;
 
     @Schema(description = "keepalive hook触发间隔,单位秒")
-    private int hookAliveInterval;
+    private Float hookAliveInterval;
 
     @Schema(description = "是否使用多端口模式")
     private boolean rtpEnable;
@@ -332,11 +331,11 @@ public class MediaServerItem{
         this.sendRtpPortRange = sendRtpPortRange;
     }
 
-    public int getHookAliveInterval() {
+    public Float getHookAliveInterval() {
         return hookAliveInterval;
     }
 
-    public void setHookAliveInterval(int hookAliveInterval) {
+    public void setHookAliveInterval(Float hookAliveInterval) {
         this.hookAliveInterval = hookAliveInterval;
     }
 }

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java

@@ -11,6 +11,9 @@ import com.github.pagehelper.PageInfo;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * @author lin
+ */
 public interface IStreamPushService {
 
     List<StreamPushItem> handleJSON(String json, MediaServerItem mediaServerItem);

+ 22 - 28
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java

@@ -1,35 +1,13 @@
 package com.genersoft.iot.vmp.service.impl;
 
-import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.conf.exception.ControllerException;
-import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData;
-import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.jdbc.datasource.DataSourceTransactionManager;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.TransactionDefinition;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.util.ObjectUtils;
-
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.session.SsrcConfig;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
@@ -37,15 +15,30 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData;
 import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.dao.MediaServerMapper;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.utils.redis.RedisUtil;
-
+import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.util.ObjectUtils;
+
+import java.time.LocalDateTime;
+import java.util.*;
 
 /**
  * 媒体服务器节点管理
@@ -129,6 +122,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
     @Override
     public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck, boolean isPlayback, Integer port) {
         if (mediaServerItem == null || mediaServerItem.getId() == null) {
+            logger.info("[openRTPServer] 失败, mediaServerItem == null || mediaServerItem.getId() == null");
             return null;
         }
         // 获取mediaServer可用的ssrc
@@ -306,7 +300,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
     public void add(MediaServerItem mediaServerItem) {
         mediaServerItem.setCreateTime(DateUtil.getNow());
         mediaServerItem.setUpdateTime(DateUtil.getNow());
-        mediaServerItem.setHookAliveInterval(120);
+        mediaServerItem.setHookAliveInterval(30f);
         JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
         if (responseJSON != null) {
             JSONArray data = responseJSON.getJSONArray("data");
@@ -413,7 +407,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
         }
         final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + serverItem.getId();
         dynamicTask.stop(zlmKeepaliveKey);
-        dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (serverItem.getHookAliveInterval() + 5) * 1000);
+        dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (Math.getExponent(serverItem.getHookAliveInterval()) + 5) * 1000);
         publisher.zlmOnlineEventPublish(serverItem.getId());
         logger.info("[ZLM] 连接成功 {} - {}:{} ",
                 zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
@@ -666,7 +660,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
         }
         final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + mediaServerItem.getId();
         dynamicTask.stop(zlmKeepaliveKey);
-        dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(mediaServerItem), (mediaServerItem.getHookAliveInterval() + 5) * 1000);
+        dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(mediaServerItem), (mediaServerItem.getHookAliveInterval().intValue() + 5) * 1000);
     }
 
     private MediaServerItem getOneFromDatabase(String mediaServerId) {

+ 68 - 71
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java

@@ -1,49 +1,28 @@
 package com.genersoft.iot.vmp.service.impl;
 
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.text.ParseException;
-import java.util.*;
-
-import javax.sip.InvalidArgumentException;
-import javax.sip.ResponseEvent;
-import javax.sip.SipException;
-
-import com.genersoft.iot.vmp.conf.exception.ControllerException;
-import com.genersoft.iot.vmp.conf.exception.ServiceException;
-import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
-import com.genersoft.iot.vmp.gb28181.bean.*;
-import com.genersoft.iot.vmp.service.IDeviceService;
-import com.genersoft.iot.vmp.utils.redis.RedisUtil;
-import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Service;
-import org.springframework.util.ObjectUtils;
-import org.springframework.web.context.request.async.DeferredResult;
-
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.exception.ControllerException;
+import com.genersoft.iot.vmp.conf.exception.ServiceException;
+import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
+import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
-import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.service.IDeviceService;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IMediaService;
 import com.genersoft.iot.vmp.service.IPlayService;
@@ -53,8 +32,29 @@ import com.genersoft.iot.vmp.service.bean.PlayBackResult;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import com.genersoft.iot.vmp.utils.redis.RedisUtil;
+import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Service;
+import org.springframework.util.ObjectUtils;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import javax.sip.InvalidArgumentException;
+import javax.sip.ResponseEvent;
+import javax.sip.SipException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.text.ParseException;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
 
 @SuppressWarnings(value = {"rawtypes", "unchecked"})
 @Service
@@ -212,6 +212,15 @@ public class PlayServiceImpl implements IPlayService {
             }
             SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
             logger.info(JSONObject.toJSONString(ssrcInfo));
+            if (ssrcInfo == null) {
+                WVPResult wvpResult = new WVPResult();
+                wvpResult.setCode(ErrorCode.ERROR100.getCode());
+                wvpResult.setMsg("开启收流失败");
+                msg.setData(wvpResult);
+
+                resultHolder.invokeAllResult(msg);
+                return playResult;
+            }
             play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response) -> {
                 if (hookEvent != null) {
                     hookEvent.response(mediaServerItem, response);
@@ -249,45 +258,33 @@ public class PlayServiceImpl implements IPlayService {
                      ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                      InviteTimeOutCallback timeoutCallback, String uuid) {
 
-        String streamId = null;
-        if (mediaServerItem.isRtpEnable()) {
-            streamId = String.format("%s_%s", device.getDeviceId(), channelId);
-        }
-        if (ssrcInfo == null) {
-            ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
-        }
         logger.info("[点播开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
         // 超时处理
         String timeOutTaskKey = UUID.randomUUID().toString();
-        SSRCInfo finalSsrcInfo = ssrcInfo;
         dynamicTask.startDelay(timeOutTaskKey, () -> {
-
-            logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, finalSsrcInfo.getPort(), finalSsrcInfo.getSsrc());
-            timeoutCallback.run(1, "收流超时");
-            // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
-            try {
-                cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null);
-            } catch (InvalidArgumentException | ParseException | SipException e) {
-                logger.error("[点播超时], 发送BYE失败 {}", e.getMessage());
-            } catch (SsrcTransactionNotFoundException e) {
-                timeoutCallback.run(0, "点播超时");
-                mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
-                mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
-                streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+            // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
+            if (redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId) == null) {
+                logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
+                // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
+                try {
+                    cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
+                } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
+                    logger.error("[点播超时], 发送BYE失败 {}", e.getMessage());
+                } finally {
+                    timeoutCallback.run(1, "收流超时");
+                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
+                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
+                    streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
+                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
+                }
             }
         }, userSetting.getPlayTimeout());
-        final String ssrc = ssrcInfo.getSsrc();
-        final String stream = ssrcInfo.getStream();
-        //端口获取失败的ssrcInfo 没有必要发送点播指令
-        if (ssrcInfo.getPort() <= 0) {
-            logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
-            return;
-        }
+
         try {
             cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
                 logger.info("收到订阅消息: " + response.toJSONString());
                 System.out.println("停止超时任务: " + timeOutTaskKey);
-                dynamicTask.stop(timeOutTaskKey);
+
                 // hook响应
                 onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid);
                 hookEvent.response(mediaServerItemInuse, response);
@@ -303,18 +300,18 @@ public class PlayServiceImpl implements IPlayService {
                     //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
                     String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
                     // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
-                    if (ssrc.equals(ssrcInResponse)) {
+                    if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
                         return;
                     }
                     logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
                     if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
-                        logger.info("[点播消息] SSRC修正 {}->{}", ssrc, ssrcInResponse);
+                        logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
 
                         if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
                             // ssrc 不可用
                             // 释放ssrc
-                            mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
-                            streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+                            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
+                            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
                             event.msg = "下级自定义了ssrc,但是此ssrc不可用";
                             event.statusCode = 400;
                             errorEvent.response(event);
@@ -324,7 +321,7 @@ public class PlayServiceImpl implements IPlayService {
                         // 单端口模式streamId也有变化,需要重新设置监听
                         if (!mediaServerItem.isRtpEnable()) {
                             // 添加订阅
-                            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
+                            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
                             subscribe.removeSubscribe(hookSubscribe);
                             hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
                             subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
@@ -336,30 +333,30 @@ public class PlayServiceImpl implements IPlayService {
                             });
                         }
                         // 关闭rtp server
-                        mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
+                        mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                         // 重新开启ssrc server
-                        mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, finalSsrcInfo.getPort());
+                        mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, ssrcInfo.getPort());
 
                     }
                 }
             }, (event) -> {
                 dynamicTask.stop(timeOutTaskKey);
-                mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
+                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                 // 释放ssrc
-                mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
+                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
 
-                streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+                streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
                 errorEvent.response(event);
             });
         } catch (InvalidArgumentException | SipException | ParseException e) {
 
             logger.error("[命令发送失败] 点播消息: {}", e.getMessage());
             dynamicTask.stop(timeOutTaskKey);
-            mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
+            mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
             // 释放ssrc
-            mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
+            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
 
-            streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
             SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null));
             eventResult.msg = "命令发送失败";
             errorEvent.response(eventResult);

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java

@@ -23,10 +23,10 @@ public interface PlatformGbStreamMapper {
 
     @Insert("<script> " +
             "INSERT into platform_gb_stream " +
-            "(gbStreamId, platformId, catalogId,status) " +
+            "(gbStreamId, platformId, catalogId) " +
             "values " +
             "<foreach collection='streamPushItems' index='index' item='item' separator=','> " +
-            "(${item.gbStreamId}, '${item.platformId}', '${item.catalogId}'), '${item.status}')" +
+            "(${item.gbStreamId}, '${item.platformId}', '${item.catalogId}')" +
             "</foreach> " +
             "</script>")
     int batchAdd(List<StreamPushItem> streamPushItems);