| 
					
				 | 
			
			
				@@ -2,12 +2,16 @@ package com.genersoft.iot.vmp.service.impl; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.alibaba.fastjson2.JSONArray; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.alibaba.fastjson2.JSONObject; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.genersoft.iot.vmp.common.GeneralCallback; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.common.StreamInfo; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.conf.UserSetting; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.conf.exception.ControllerException; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.gb28181.event.EventPublisher; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -85,6 +89,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     private IMediaServerService mediaServerService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private ZlmHttpHookSubscribe hookSubscribe; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     DataSourceTransactionManager dataSourceTransactionManager; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -93,7 +100,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     @Override 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    public StreamInfo save(StreamProxyItem param) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    public void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         MediaServerItem mediaInfo; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null); 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -104,10 +111,43 @@ public class StreamProxyServiceImpl implements IStreamProxyService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             logger.warn("保存代理未找到在线的ZLM..."); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到在线的ZLM"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        String dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                param.getStream() ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        param.setDst_url(dstUrl); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        StringBuffer resultMsg = new StringBuffer(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        String dstUrl; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if ("ffmpeg".equalsIgnoreCase(param.getType())) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            JSONObject jsonObject = zlmresTfulUtils.getMediaServerConfig(mediaInfo); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if (jsonObject.getInteger("code") != 0) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取流媒体配置失败"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            JSONArray dataArray = jsonObject.getJSONArray("data"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            JSONObject mediaServerConfig = dataArray.getJSONObject(0); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            String ffmpegCmd = mediaServerConfig.getString(param.getFfmpegCmdKey()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            String schema = getSchemaFromFFmpegCmd(ffmpegCmd); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if (schema == null) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                throw new ControllerException(ErrorCode.ERROR100.getCode(), "ffmpeg拉流代理无法从ffmpeg cmd中获取到输出格式"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            int port; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            String schemaForUri; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if (schema.equalsIgnoreCase("rtsp")) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                port = mediaInfo.getRtspPort(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                schemaForUri = schema; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            }else if (schema.equalsIgnoreCase("flv")) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                port = mediaInfo.getHttpPort(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                schemaForUri = "http"; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            }else if (schema.equalsIgnoreCase("rtmp")) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                port = mediaInfo.getRtmpPort(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                schemaForUri = schema; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            }else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                port = mediaInfo.getRtmpPort(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                schemaForUri = schema; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, param.getApp(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    param.getStream()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        }else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    param.getStream()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        param.setDstUrl(dstUrl); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        logger.info("[拉流代理] 输出地址为:{}", dstUrl); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         param.setMediaServerId(mediaInfo.getId()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         boolean saveResult; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         // 更新 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -117,29 +157,60 @@ public class StreamProxyServiceImpl implements IStreamProxyService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             saveResult = addStreamProxy(param); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         if (!saveResult) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        StreamInfo resultForStreamInfo = null; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        resultMsg.append("保存成功"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    mediaInfo, param.getApp(), param.getStream(), null, null); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         if (param.isEnable()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             JSONObject jsonObject = addStreamProxyToZlm(param); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            if (jsonObject == null || jsonObject.getInteger("code") != 0) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                resultMsg.append(", 但是启用失败,请检查流地址是否可用"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if (jsonObject != null && jsonObject.getInteger("code") == 0) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        mediaInfo, param.getApp(), param.getStream(), null, null); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            }else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 param.setEnable(false); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 // 直接移除 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                if (param.isEnable_remove_none_reader()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                if (param.isEnableRemoveNoneReader()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     del(param.getApp(), param.getStream()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 }else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     updateStreamProxy(param); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                if (jsonObject == null){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    return; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                }else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    return; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            }else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                resultForStreamInfo = mediaService.getStreamInfoByAppAndStream( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        mediaInfo, param.getApp(), param.getStream(), null, null); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private String getSchemaFromFFmpegCmd(String ffmpegCmd) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ffmpegCmd = ffmpegCmd.replaceAll(" + ", " "); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        String[] paramArray = ffmpegCmd.split(" "); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (paramArray.length == 0) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return null; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        for (int i = 0; i < paramArray.length; i++) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if (paramArray[i].equalsIgnoreCase("-f")) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                if (i + 1 < paramArray.length - 1) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    return paramArray[i+1]; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                }else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    return null; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        return resultForStreamInfo; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return null; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     /** 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -228,11 +299,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         if ("default".equals(param.getType())){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    param.isEnable_audio(), param.isEnable_mp4(), param.getRtp_type()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    param.isEnableAudio(), param.isEnableMp4(), param.getRtpType()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         }else if ("ffmpeg".equals(param.getType())) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrc_url(), param.getDst_url(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    param.getTimeout_ms() + "", param.isEnable_audio(), param.isEnable_mp4(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    param.getFfmpeg_cmd_key()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl(), param.getDstUrl(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    param.getFfmpegCmdKey()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         return result; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -286,7 +357,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 updateStreamProxy(streamProxy); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             }else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 logger.info("启用代理失败: {}/{}->{}({})", app, stream, jsonObject.getString("msg"), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        streamProxy.getSrc_url() == null? streamProxy.getUrl():streamProxy.getSrc_url()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        streamProxy.getSrcUrl() == null? streamProxy.getUrl():streamProxy.getSrcUrl()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         return result; 
			 |