浏览代码

优化拉流代理逻辑,修复ffmpeg拉流代理鉴权

648540858 2 年之前
父节点
当前提交
130dc5d82d

+ 3 - 0
sql/2.6.9更新.sql

@@ -3,3 +3,6 @@ alter table wvp_device_channel
 
 alter table wvp_platform
     add auto_push_channel bool default false
+
+alter table wvp_stream_proxy
+    add stream_key varying(255)

+ 1 - 0
sql/初始化.sql

@@ -244,6 +244,7 @@ create table wvp_stream_proxy (
                                   create_time character varying(50),
                                   name character varying(255),
                                   update_time character varying(50),
+                                  stream_key character varying(255),
                                   enable_disable_none_reader bool default false,
                                   constraint uk_stream_proxy_app_stream unique (app, stream)
 );

+ 4 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java

@@ -93,7 +93,10 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
                     }
                     if (event.getGbStreams() != null && event.getGbStreams().size() > 0){
                         for (GbStream gbStream : event.getGbStreams()) {
-                            if (gbStream != null && gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) {
+                            if (gbStream != null
+                                    && gbStream.getStreamType() != null
+                                    && gbStream.getStreamType().equals("push")
+                                    && !userSetting.isUsePushingAsStatus()) {
                                 continue;
                             }
                             DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform);

+ 7 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -199,6 +199,13 @@ public class ZLMHttpHookListener {
         }
         // 推流鉴权的处理
         if (!"rtp".equals(param.getApp())) {
+            StreamProxyItem stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
+            if (stream != null) {
+                HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
+                result.setEnable_audio(stream.isEnableAudio());
+                result.setEnable_mp4(stream.isEnableMp4());
+                return result;
+            }
             if (userSetting.getPushAuthority()) {
                 // 推流鉴权
                 if (param.getParams() == null) {

+ 6 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java

@@ -272,6 +272,12 @@ public class ZLMRESTfulUtils {
         return sendPost(mediaServerItem, "delFFmpegSource",param, null);
     }
 
+    public JSONObject delStreamProxy(MediaServerItem mediaServerItem, String key){
+        Map<String, Object> param = new HashMap<>();
+        param.put("key", key);
+        return sendPost(mediaServerItem, "delStreamProxy",param, null);
+    }
+
     public JSONObject getMediaServerConfig(MediaServerItem mediaServerItem){
         return sendPost(mediaServerItem, "getServerConfig",null, null);
     }

+ 9 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java

@@ -41,6 +41,9 @@ public class StreamProxyItem extends GbStream {
     @Schema(description = "是否 无人观看时自动停用")
     private boolean enableDisableNoneReader;
 
+    @Schema(description = "拉流代理时zlm返回的key,用于停止拉流代理")
+    private String streamKey;
+
     public String getType() {
         return type;
     }
@@ -167,5 +170,11 @@ public class StreamProxyItem extends GbStream {
         this.enableAudio = enable_audio;
     }
 
+    public String getStreamKey() {
+        return streamKey;
+    }
 
+    public void setStreamKey(String streamKey) {
+        this.streamKey = streamKey;
+    }
 }

+ 42 - 21
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java

@@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
@@ -61,6 +62,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
     @Autowired
     private ZLMRESTfulUtils zlmresTfulUtils;
 
+    @Autowired
+    private ZLMServerFactory zlmServerFactory;
+
     @Autowired
     private StreamProxyMapper streamProxyMapper;
 
@@ -145,7 +149,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
             dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, param.getApp(),
                     param.getStream());
         }else {
-            dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
+            dstUrl = String.format("rtsp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtspPort(), param.getApp(),
                     param.getStream());
         }
         param.setDstUrl(dstUrl);
@@ -170,12 +174,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
         });
         if (param.isEnable()) {
             String talkKey = UUID.randomUUID().toString();
-//            dynamicTask.startCron(talkKey, ()->{
-//                StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
-//                if (streamInfo != null) {
-//                    callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
-//                }
-//            }, 3000);
             String delayTalkKey = UUID.randomUUID().toString();
             dynamicTask.startDelay(delayTalkKey, ()->{
                 StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
@@ -318,13 +316,32 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
         if (mediaServerItem == null) {
             return null;
         }
-        if ("default".equals(param.getType())){
-            result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
-                    param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
-        }else if ("ffmpeg".equals(param.getType())) {
+        if (zlmServerFactory.isStreamReady(mediaServerItem, param.getApp(), param.getStream())) {
+            zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
+        }
+        if ("ffmpeg".equalsIgnoreCase(param.getType())){
             result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl().trim(), param.getDstUrl(),
                     param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(),
                     param.getFfmpegCmdKey());
+        }else {
+            result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
+                    param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
+        }
+        System.out.println("addStreamProxyToZlm====");
+        System.out.println(result);
+        if (result != null && result.getInteger("code") == 0) {
+            JSONObject data = result.getJSONObject("data");
+            if (data == null) {
+                logger.warn("[获取拉流代理的结果数据Data] 失败: {}", result );
+                return result;
+            }
+            String key = data.getString("key");
+            if (key == null) {
+                logger.warn("[获取拉流代理的结果数据Data中的KEY] 失败: {}", result );
+                return result;
+            }
+            param.setStreamKey(key);
+            streamProxyMapper.update(param);
         }
         return result;
     }
@@ -335,7 +352,12 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
             return null;
         }
         MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
-        JSONObject result = zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
+        JSONObject result = null;
+        if ("ffmpeg".equalsIgnoreCase(param.getType())){
+            result = zlmresTfulUtils.delFFmpegSource(mediaServerItem, param.getStreamKey());
+        }else {
+            result = zlmresTfulUtils.delStreamProxy(mediaServerItem, param.getStreamKey());
+        }
         return result;
     }
 
@@ -350,19 +372,18 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
         if (streamProxyItem != null) {
             gbStreamService.sendCatalogMsg(streamProxyItem, CatalogEvent.DEL);
 
+            // 如果关联了国标那么移除关联
+            platformGbStreamMapper.delByAppAndStream(app, stream);
+            gbStreamMapper.del(app, stream);
+            videoManagerStorager.deleteStreamProxy(app, stream);
+            redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream);
             JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
             if (jsonObject != null && jsonObject.getInteger("code") == 0) {
-                // 如果关联了国标那么移除关联
-                int i = platformGbStreamMapper.delByAppAndStream(app, stream);
-                gbStreamMapper.del(app, stream);
-                System.out.println();
-                // TODO 如果关联的推流, 那么状态设置为离线
+                logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", app, stream);
+            }else {
+                logger.info("[移除代理]: 代理: {}/{}, 从zlm移除失败", app, stream);
             }
-            videoManagerStorager.deleteStreamProxy(app, stream);
-            redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream);
         }
-
-
     }
 
     @Override

+ 4 - 3
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java

@@ -12,9 +12,9 @@ import java.util.List;
 public interface StreamProxyMapper {
 
     @Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, url, src_url, dst_url, " +
-            "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, enable_remove_none_reader, enable_disable_none_reader, create_time) VALUES" +
+            "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, stream_key, enable_remove_none_reader, enable_disable_none_reader, create_time) VALUES" +
             "(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{srcUrl}, #{dstUrl}, " +
-            "#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, " +
+            "#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, #{streamKey}, " +
             "#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} )")
     int add(StreamProxyItem streamProxyDto);
 
@@ -33,6 +33,7 @@ public interface StreamProxyMapper {
             "enable_audio=#{enableAudio}, " +
             "enable=#{enable}, " +
             "status=#{status}, " +
+            "stream_key=#{streamKey}, " +
             "enable_remove_none_reader=#{enableRemoveNoneReader}, " +
             "enable_disable_none_reader=#{enableDisableNoneReader}, " +
             "enable_mp4=#{enableMp4} " +
@@ -45,7 +46,7 @@ public interface StreamProxyMapper {
     @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream order by st.create_time desc")
     List<StreamProxyItem> selectAll();
 
-    @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=#{enable} order by st.create_time desc")
+    @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude, 'proxy' as streamType FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=#{enable} order by st.create_time desc")
     List<StreamProxyItem> selectForEnable(boolean enable);
 
     @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.app=#{app} AND st.stream=#{stream} order by st.create_time desc")

+ 14 - 0
src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java

@@ -67,6 +67,16 @@ public class StreamProxyController {
         return streamProxyService.getAll(page, count);
     }
 
+    @Operation(summary = "查询流代理")
+    @Parameter(name = "app", description = "应用名")
+    @Parameter(name = "stream", description = "流Id")
+    @GetMapping(value = "/one")
+    @ResponseBody
+    public StreamProxyItem one(String app, String stream){
+
+        return streamProxyService.getStreamProxyByAppAndStream(app, stream);
+    }
+
     @Operation(summary = "保存代理", parameters = {
             @Parameter(name = "param", description = "代理参数", required = true),
     })
@@ -86,6 +96,10 @@ public class StreamProxyController {
         if (ObjectUtils.isEmpty(param.getGbId())) {
             param.setGbId(null);
         }
+        StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
+        if (streamProxyItem  != null) {
+            streamProxyService.del(param.getApp(), param.getStream());
+        }
 
         RequestMessage requestMessage = new RequestMessage();
         String key = DeferredResultHolder.CALLBACK_CMD_PROXY + param.getApp() + param.getStream();