panlinlin 1 år sedan
förälder
incheckning
9c0e69c6c5

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java

@@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
+import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 
 import java.util.List;
@@ -62,4 +63,6 @@ public interface IMediaNodeServerService {
     void startSendRtpStream(MediaServer mediaServer, SendRtpItem sendRtpItem);
 
     Long updateDownloadProcess(MediaServer mediaServer, String app, String stream);
+
+    StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy);
 }

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java

@@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.media.bean.MediaInfo;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
+import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 
 import java.util.List;
@@ -152,4 +153,6 @@ public interface IMediaServerService {
     MediaServer getMediaServerByAppAndStream(String app, String stream);
 
     Long updateDownloadProcess(MediaServer mediaServerItem, String app, String stream);
+
+    StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy);
 }

+ 11 - 0
src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java

@@ -25,6 +25,7 @@ import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.dao.MediaServerMapper;
+import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.utils.JsonUtil;
 import com.genersoft.iot.vmp.utils.redis.RedisUtil;
@@ -919,4 +920,14 @@ public class MediaServerServiceImpl implements IMediaServerService {
         }
         return mediaNodeServerService.updateDownloadProcess(mediaServer, app, stream);
     }
+
+    @Override
+    public StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy) {
+        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
+        if (mediaNodeServerService == null) {
+            logger.info("[startProxy] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
+            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类");
+        }
+        return mediaNodeServerService.startProxy(mediaServer, streamProxy);
+    }
 }

+ 6 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java

@@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.media.bean.MediaInfo;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.service.IMediaNodeServerService;
 import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig;
+import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 import org.slf4j.Logger;
@@ -390,4 +391,9 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
         }
         return mediaInfo.getDuration();
     }
+
+    @Override
+    public StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy) {
+        return null;
+    }
 }

+ 0 - 1
src/main/java/com/genersoft/iot/vmp/streamProxy/bean/StreamProxy.java

@@ -34,7 +34,6 @@ public class StreamProxy extends CommonGBChannel {
     @Schema(description = "拉流地址")
     private String srcUrl;
 
-
     @Schema(description = "超时时间")
     private int timeout;
 

+ 41 - 31
src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java

@@ -136,21 +136,22 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
 
 
     @Override
-    public void save(StreamProxy param, GeneralCallback<StreamInfo> callback) {
+    public void save(StreamProxy streamProxy, GeneralCallback<StreamInfo> callback) {
         MediaServer mediaServer;
-        if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){
+        if (ObjectUtils.isEmpty(streamProxy.getMediaServerId()) || "auto".equals(streamProxy.getMediaServerId())){
             mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
         }else {
-            mediaServer = mediaServerService.getOne(param.getMediaServerId());
+            mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId());
         }
         if (mediaServer == null) {
             log.warn("保存代理未找到在线的ZLM...");
             throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到在线的ZLM");
         }
+
         String dstUrl;
-        if ("ffmpeg".equalsIgnoreCase(param.getType())) {
+        if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())) {
 
-            String ffmpegCmd = mediaServerService.getFfmpegCmd(mediaServer, param.getFfmpegCmdKey());
+            String ffmpegCmd = mediaServerService.getFfmpegCmd(mediaServer, streamProxy.getFfmpegCmdKey());
 
             if (ffmpegCmd == null) {
                 throw new ControllerException(ErrorCode.ERROR100.getCode(), "ffmpeg拉流代理无法获取ffmpeg cmd");
@@ -172,37 +173,47 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
                 schemaForUri = schema;
             }
 
-            dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, param.getApp(),
-                    param.getStream());
+            dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, streamProxy.getApp(),
+                    streamProxy.getStream());
         }else {
-            dstUrl = String.format("rtsp://%s:%s/%s/%s", "127.0.0.1", mediaServer.getRtspPort(), param.getApp(),
-                    param.getStream());
+            dstUrl = String.format("rtsp://%s:%s/%s/%s", "127.0.0.1", mediaServer.getRtspPort(), streamProxy.getApp(),
+                    streamProxy.getStream());
         }
-        param.setDstUrl(dstUrl);
+        streamProxy.setDstUrl(dstUrl);
         log.info("[拉流代理] 输出地址为:{}", dstUrl);
-        param.setMediaServerId(mediaServer.getId());
+        streamProxy.setMediaServerId(mediaServer.getId());
         boolean saveResult;
         // 更新
-        if (streamProxyMapper.selectOne(param.getApp(), param.getStream()) != null) {
-            saveResult = updateStreamProxy(param);
+        if (streamProxyMapper.selectOne(streamProxy.getApp(), streamProxy.getStream()) != null) {
+            saveResult = updateStreamProxy(streamProxy);
         }else { // 新增
-            saveResult = addStreamProxy(param);
+            saveResult = addStreamProxy(streamProxy);
         }
         if (!saveResult) {
             callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null);
             return;
         }
-        Hook hook = Hook.getInstance(HookType.on_media_arrival, param.getApp(), param.getStream(), mediaServer.getId());
-        hookSubscribe.addSubscribe(hook, (hookData) -> {
-            StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(
-                    mediaServer, param.getApp(), param.getStream(), null, null);
-            callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
-        });
-        if (param.isEnable()) {
+
+        if (streamProxy.isEnable()) {
+            StreamInfo streamInfo = mediaServerService.startProxy(streamProxy);
+            if (streamInfo != null) {
+                callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
+            }else {
+                callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null);
+            }
+
+
+
+            Hook hook = Hook.getInstance(HookType.on_media_arrival, streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId());
+            hookSubscribe.addSubscribe(hook, (hookData) -> {
+                StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(
+                        mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null);
+                callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
+            });
             String talkKey = UUID.randomUUID().toString();
             String delayTalkKey = UUID.randomUUID().toString();
             dynamicTask.startDelay(delayTalkKey, ()->{
-                StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaServer.getId(), false);
+                StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId(), false);
                 if (streamInfo != null) {
                     callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
                 }else {
@@ -210,20 +221,20 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
                     callback.run(ErrorCode.ERROR100.getCode(), "超时", null);
                 }
             }, 7000);
-            WVPResult<String> result = addStreamProxyToZlm(param);
+            WVPResult<String> result = addStreamProxyToZlm(streamProxy);
             if (result != null && result.getCode() == 0) {
                 hookSubscribe.removeSubscribe(hook);
                 dynamicTask.stop(talkKey);
                 StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(
-                        mediaServer, param.getApp(), param.getStream(), null, null);
+                        mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null);
                 callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
             }else {
-                param.setEnable(false);
+                streamProxy.setEnable(false);
                 // 直接移除
-                if (param.isEnableRemoveNoneReader()) {
-                    del(param.getApp(), param.getStream());
+                if (streamProxy.isEnableRemoveNoneReader()) {
+                    del(streamProxy.getApp(), streamProxy.getStream());
                 }else {
-                    updateStreamProxy(param);
+                    updateStreamProxy(streamProxy);
                 }
                 if (result == null){
                     callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null);
@@ -231,10 +242,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
                     callback.run(ErrorCode.ERROR100.getCode(), result.getMsg(), null);
                 }
             }
-        }
-        else{
+        }else{
             StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(
-                    mediaServer, param.getApp(), param.getStream(), null, null);
+                    mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null);
             callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
         }
     }