| 
					
				 | 
			
			
				@@ -2,12 +2,16 @@ package com.genersoft.iot.vmp.service.impl; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.alibaba.fastjson2.JSONArray; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.alibaba.fastjson2.JSONObject; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.genersoft.iot.vmp.common.GeneralCallback; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.common.StreamInfo; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.conf.UserSetting; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.conf.exception.ControllerException; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.gb28181.event.EventPublisher; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -85,6 +89,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     private IMediaServerService mediaServerService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private ZlmHttpHookSubscribe hookSubscribe; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     DataSourceTransactionManager dataSourceTransactionManager; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -93,7 +100,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     @Override 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    public StreamInfo save(StreamProxyItem param) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    public void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         MediaServerItem mediaInfo; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null); 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -107,7 +114,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         String dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 param.getStream() ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         param.setDst_url(dstUrl); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        StringBuffer resultMsg = new StringBuffer(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         param.setMediaServerId(mediaInfo.getId()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         boolean saveResult; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         // 更新 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -117,14 +123,25 @@ public class StreamProxyServiceImpl implements IStreamProxyService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             saveResult = addStreamProxy(param); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         if (!saveResult) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        StreamInfo resultForStreamInfo = null; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        resultMsg.append("保存成功"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    mediaInfo, param.getApp(), param.getStream(), null, null); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         if (param.isEnable()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             JSONObject jsonObject = addStreamProxyToZlm(param); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            if (jsonObject == null || jsonObject.getInteger("code") != 0) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                resultMsg.append(", 但是启用失败,请检查流地址是否可用"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if (jsonObject != null && jsonObject.getInteger("code") == 0) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        mediaInfo, param.getApp(), param.getStream(), null, null); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            }else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 param.setEnable(false); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 // 直接移除 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 if (param.isEnable_remove_none_reader()) { 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -132,14 +149,15 @@ public class StreamProxyServiceImpl implements IStreamProxyService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 }else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     updateStreamProxy(param); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            }else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                resultForStreamInfo = mediaService.getStreamInfoByAppAndStream( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        mediaInfo, param.getApp(), param.getStream(), null, null); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                if (jsonObject == null){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    return; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                }else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    return; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        return resultForStreamInfo; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     /** 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -233,6 +251,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrc_url(), param.getDst_url(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     param.getTimeout_ms() + "", param.isEnable_audio(), param.isEnable_mp4(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     param.getFfmpeg_cmd_key()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            System.out.println(result); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         return result; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 |