Bläddra i källkod

优化拉流代理

648540858 2 år sedan
förälder
incheckning
483d5e04a7

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java

@@ -93,7 +93,7 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
                     }
                     if (event.getGbStreams() != null && event.getGbStreams().size() > 0){
                         for (GbStream gbStream : event.getGbStreams()) {
-                            if (gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) {
+                            if (gbStream != null && gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) {
                                 continue;
                             }
                             DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform);

+ 16 - 5
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java

@@ -32,13 +32,20 @@ public class ZLMRESTfulUtils {
     }
 
     private OkHttpClient getClient(){
+        return getClient(null);
+    }
+
+    private OkHttpClient getClient(Integer readTimeOut){
         if (client == null) {
+            if (readTimeOut == null) {
+                readTimeOut = 10;
+            }
             OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
             //todo 暂时写死超时时间 均为5s
             // 设置连接超时时间
-            httpClientBuilder.connectTimeout(5,TimeUnit.SECONDS);
+            httpClientBuilder.connectTimeout(8,TimeUnit.SECONDS);
             // 设置读取超时时间
-            httpClientBuilder.readTimeout(10,TimeUnit.SECONDS);
+            httpClientBuilder.readTimeout(readTimeOut,TimeUnit.SECONDS);
             // 设置连接池
             httpClientBuilder.connectionPool(new ConnectionPool(16, 5, TimeUnit.MINUTES));
             if (logger.isDebugEnabled()) {
@@ -55,9 +62,13 @@ public class ZLMRESTfulUtils {
 
     }
 
-
     public JSONObject sendPost(MediaServerItem mediaServerItem, String api, Map<String, Object> param, RequestCallback callback) {
-        OkHttpClient client = getClient();
+        return sendPost(mediaServerItem, api, param, callback, null);
+    }
+
+
+    public JSONObject sendPost(MediaServerItem mediaServerItem, String api, Map<String, Object> param, RequestCallback callback, Integer readTimeOut) {
+        OkHttpClient client = getClient(readTimeOut);
 
         if (mediaServerItem == null) {
             return null;
@@ -310,7 +321,7 @@ public class ZLMRESTfulUtils {
         param.put("enable_mp4", enable_mp4?1:0);
         param.put("enable_audio", enable_audio?1:0);
         param.put("rtp_type", rtp_type);
-        return sendPost(mediaServerItem, "addStreamProxy",param, null);
+        return sendPost(mediaServerItem, "addStreamProxy",param, null, 20);
     }
 
     public JSONObject closeStreams(MediaServerItem mediaServerItem, String app, String stream) {

+ 16 - 8
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java

@@ -11,6 +11,8 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
@@ -160,15 +162,20 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
             callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null);
             return;
         }
-
+        HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
+        hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
+            StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
+                    mediaInfo, param.getApp(), param.getStream(), null, null);
+            callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
+        });
         if (param.isEnable()) {
             String talkKey = UUID.randomUUID().toString();
-            dynamicTask.startCron(talkKey, ()->{
-                StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
-                if (streamInfo != null) {
-                    callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
-                }
-            }, 1000);
+//            dynamicTask.startCron(talkKey, ()->{
+//                StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
+//                if (streamInfo != null) {
+//                    callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
+//                }
+//            }, 3000);
             String delayTalkKey = UUID.randomUUID().toString();
             dynamicTask.startDelay(delayTalkKey, ()->{
                 StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
@@ -178,9 +185,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
                     dynamicTask.stop(talkKey);
                     callback.run(ErrorCode.ERROR100.getCode(), "超时", null);
                 }
-            }, 5000);
+            }, 7000);
             JSONObject jsonObject = addStreamProxyToZlm(param);
             if (jsonObject != null && jsonObject.getInteger("code") == 0) {
+                hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
                 dynamicTask.stop(talkKey);
                 StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
                         mediaInfo, param.getApp(), param.getStream(), null, null);

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java

@@ -80,6 +80,9 @@ public class StreamProxyController {
         if (ObjectUtils.isEmpty(param.getType())) {
             param.setType("default");
         }
+        if (ObjectUtils.isEmpty(param.getRtpType())) {
+            param.setRtpType("1");
+        }
         if (ObjectUtils.isEmpty(param.getGbId())) {
             param.setGbId(null);
         }