瀏覽代碼

优化适配zlm的hook保活

648540858 3 年之前
父節點
當前提交
ab81136765
共有 20 個文件被更改,包括 440 次插入78 次删除
  1. 0 2
      src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
  2. 7 1
      src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
  3. 38 33
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
  4. 0 1
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
  5. 15 5
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
  6. 52 0
      src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java
  7. 11 0
      src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEvent.java
  8. 44 0
      src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEventListener.java
  9. 11 0
      src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEvent.java
  10. 65 0
      src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEventListener.java
  11. 4 2
      src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java
  12. 4 2
      src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
  13. 18 10
      src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
  14. 35 7
      src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
  15. 88 10
      src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
  16. 4 2
      src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
  17. 14 0
      src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
  18. 4 1
      src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
  19. 12 0
      src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
  20. 14 2
      src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

+ 0 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java

@@ -1,9 +1,7 @@
 package com.genersoft.iot.vmp.gb28181;
 
 import com.genersoft.iot.vmp.conf.SipConfig;
-import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
 import com.genersoft.iot.vmp.gb28181.transmit.ISIPProcessorObserver;
-import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
 import gov.nist.javax.sip.SipProviderImpl;
 import gov.nist.javax.sip.SipStackImpl;
 import org.slf4j.Logger;

+ 7 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java

@@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.gb28181.event.offline.OfflineEvent;
 import com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire.PlatformKeepaliveExpireEvent;
 import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformNotRegisterEvent;
 import com.genersoft.iot.vmp.media.zlm.event.ZLMOfflineEvent;
+import com.genersoft.iot.vmp.media.zlm.event.ZLMOnlineEvent;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationEventPublisher;
 import org.springframework.stereotype.Component;
@@ -73,5 +74,10 @@ public class EventPublisher {
 		outEvent.setMediaServerId(mediaServerId);
 		applicationEventPublisher.publishEvent(outEvent);
 	}
-	
+
+	public void zlmOnlineEventPublish(String mediaServerId) {
+		ZLMOnlineEvent outEvent = new ZLMOnlineEvent(this);
+		outEvent.setMediaServerId(mediaServerId);
+		applicationEventPublisher.publishEvent(outEvent);
+	}
 }

+ 38 - 33
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -179,29 +179,33 @@ public class ZLMHttpHookListener {
 	public ResponseEntity<String> onPublish(@RequestBody JSONObject json) {
 
 		logger.debug("[ ZLM HOOK ]on_publish API调用,参数:" + json.toString());
-
+		JSONObject ret = new JSONObject();
+		ret.put("code", 0);
+		ret.put("msg", "success");
+		ret.put("enableHls", true);
+		ret.put("enableMP4", userSetup.isRecordPushLive());
 		String mediaServerId = json.getString("mediaServerId");
 		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json);
 		if (subscribe != null) {
 			MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
 			if (mediaInfo != null) {
 				subscribe.response(mediaInfo, json);
+			}else {
+				ret.put("code", 1);
+				ret.put("msg", "zlm not register");
 			}
 		}
 	 	String app = json.getString("app");
 	 	String stream = json.getString("stream");
 		StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(stream);
-		JSONObject ret = new JSONObject();
+
 		// 录像回放时不进行录像下载
 		if (streamInfo != null) {
 			ret.put("enableMP4", false);
 		}else {
 			ret.put("enableMP4", userSetup.isRecordPushLive());
 		}
-		ret.put("code", 0);
-		ret.put("msg", "success");
-		ret.put("enableHls", true);
-		ret.put("enableMP4", userSetup.isRecordPushLive());
+
 		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
 	}
 	
@@ -340,37 +344,38 @@ public class ZLMHttpHookListener {
 				if (!"rtp".equals(app)){
 					String type = OriginType.values()[item.getOriginType()].getType();
 					MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
-					if (regist) {
-						StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks);
-						redisCatchStorage.addStream(mediaServerItem, type, app, streamId, streamInfo);
-						if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
-								|| item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
-								|| item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
-							zlmMediaListManager.addPush(item);
-						}
-					}else {
-						// 兼容流注销时类型错误的问题,等zlm更新后删除
-						StreamPushItem streamPushItem = streamPushService.getPush(app, streamId);
-						if (streamPushItem != null) {
-							type = "PUSH";
+					if (mediaServerItem != null){
+						if (regist) {
+							StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks);
+							redisCatchStorage.addStream(mediaServerItem, type, app, streamId, streamInfo);
+							if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
+									|| item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
+									|| item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
+								zlmMediaListManager.addPush(item);
+							}
 						}else {
-							StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(app, streamId);
-							if (streamProxyByAppAndStream != null) {
-								type = "PULL";
+							// 兼容流注销时类型错误的问题,等zlm更新后删除
+							StreamPushItem streamPushItem = streamPushService.getPush(app, streamId);
+							if (streamPushItem != null) {
+								type = "PUSH";
+							}else {
+								StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(app, streamId);
+								if (streamProxyByAppAndStream != null) {
+									type = "PULL";
+								}
 							}
+							zlmMediaListManager.removeMedia(app, streamId);
+							redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, streamId);
 						}
-						zlmMediaListManager.removeMedia(app, streamId);
-						redisCatchStorage.removeStream(mediaServerItem, type, app, streamId);
+						// 发送流变化redis消息
+						JSONObject jsonObject = new JSONObject();
+						jsonObject.put("serverId", userSetup.getServerId());
+						jsonObject.put("app", app);
+						jsonObject.put("stream", streamId);
+						jsonObject.put("register", regist);
+						jsonObject.put("mediaServerId", mediaServerId);
+						redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
 					}
-
-					// 发送流变化redis消息
-					JSONObject jsonObject = new JSONObject();
-					jsonObject.put("serverId", userSetup.getServerId());
-					jsonObject.put("app", app);
-					jsonObject.put("stream", streamId);
-					jsonObject.put("register", regist);
-					jsonObject.put("mediaServerId", mediaServerId);
-					redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
 				}
 			}
 		}

+ 0 - 1
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java

@@ -141,7 +141,6 @@ public class ZLMMediaListManager {
             }else {
                 gbStreamMapper.add(transform);
             }
-
         }
     }
 

+ 15 - 5
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.conf.MediaConfig;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IStreamProxyService;
@@ -17,6 +18,7 @@ import org.springframework.core.annotation.Order;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
 
 import java.util.*;
 
@@ -37,6 +39,9 @@ public class ZLMRunner implements CommandLineRunner {
     @Autowired
     private IStreamProxyService streamProxyService;
 
+    @Autowired
+    private EventPublisher publisher;
+
     @Autowired
     private IMediaServerService mediaServerService;
 
@@ -117,7 +122,7 @@ public class ZLMRunner implements CommandLineRunner {
 
     @Async
     public void connectZlmServer(MediaServerItem mediaServerItem){
-        ZLMServerConfig zlmServerConfig = getMediaServerConfig(mediaServerItem);
+        ZLMServerConfig zlmServerConfig = getMediaServerConfig(mediaServerItem, 1);
         if (zlmServerConfig != null) {
             zlmServerConfig.setIp(mediaServerItem.getIp());
             zlmServerConfig.setHttpPort(mediaServerItem.getHttpPort());
@@ -126,7 +131,7 @@ public class ZLMRunner implements CommandLineRunner {
         }
     }
 
-    public ZLMServerConfig getMediaServerConfig(MediaServerItem mediaServerItem) {
+    public ZLMServerConfig getMediaServerConfig(MediaServerItem mediaServerItem, int index) {
         if (startGetMedia == null) { return null;}
         if (!mediaServerItem.isDefaultServer() && mediaServerService.getOne(mediaServerItem.getId()) == null) {
             return null;
@@ -143,14 +148,19 @@ public class ZLMRunner implements CommandLineRunner {
                 ZLMServerConfig.setIp(mediaServerItem.getIp());
             }
         } else {
-            logger.error("[ {} ]-[ {}:{} ]主动连接失败失败, 2s后重试",
-                    mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
+            logger.error("[ {} ]-[ {}:{} ]第{}次主动连接失败, 2s后重试",
+                    mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort(), index);
+            if (index == 1 && !StringUtils.isEmpty(mediaServerItem.getId())) {
+                logger.info("[ {} ]-[ {}:{} ]第{}次主动连接失败, 开始清理相关资源",
+                        mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort(), index);
+                publisher.zlmOfflineEventPublish(mediaServerItem.getId());
+            }
             try {
                 Thread.sleep(2000);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
-            ZLMServerConfig = getMediaServerConfig(mediaServerItem);
+            ZLMServerConfig = getMediaServerConfig(mediaServerItem, index += 1);
         }
         return ZLMServerConfig;
 

+ 52 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java

@@ -0,0 +1,52 @@
+package com.genersoft.iot.vmp.media.zlm.event;
+
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.conf.UserSetup;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.stereotype.Component;
+
+/**    
+ * @description:设备心跳超时监听,借助redis过期特性,进行监听,监听到说明设备心跳超时,发送离线事件
+ * @author: swwheihei
+ * @date:   2020年5月6日 上午11:35:46     
+ */
+@Component
+public class ZLMKeepliveTimeoutListener extends KeyExpirationEventMessageListener {
+
+    private Logger logger = LoggerFactory.getLogger(ZLMKeepliveTimeoutListener.class);
+
+	@Autowired
+	private EventPublisher publisher;
+
+	@Autowired
+	private UserSetup userSetup;
+
+	public ZLMKeepliveTimeoutListener(RedisMessageListenerContainer listenerContainer) {
+		super(listenerContainer);
+	}
+
+	/**
+     * 监听失效的key,key格式为keeplive_deviceId
+     * @param message
+     * @param pattern
+     */
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+        //  获取失效的key
+        String expiredKey = message.toString();
+        String KEEPLIVEKEY_PREFIX = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetup.getServerId() + "_";
+        if(!expiredKey.startsWith(KEEPLIVEKEY_PREFIX)){
+        	return;
+        }
+        
+        String mediaServerId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length());
+
+        publisher.zlmOfflineEventPublish(mediaServerId);
+    }
+}

+ 11 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEvent.java

@@ -0,0 +1,11 @@
+package com.genersoft.iot.vmp.media.zlm.event;
+
+/**
+ * zlm离线事件类
+ */
+public class ZLMOfflineEvent extends ZLMEventAbstract {
+
+	public ZLMOfflineEvent(Object source) {
+		super(source);
+	}
+}

+ 44 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEventListener.java

@@ -0,0 +1,44 @@
+package com.genersoft.iot.vmp.media.zlm.event;
+
+import com.genersoft.iot.vmp.conf.UserSetup;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamProxyService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import com.genersoft.iot.vmp.utils.redis.RedisUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationListener;
+import org.springframework.stereotype.Component;
+
+/**
+ *
+ */
+@Component
+public class ZLMOfflineEventListener implements ApplicationListener<ZLMOfflineEvent> {
+
+	private final static Logger logger = LoggerFactory.getLogger(ZLMOfflineEventListener.class);
+
+	@Autowired
+    private IMediaServerService mediaServerService;
+
+	@Autowired
+    private IStreamPushService streamPushService;
+
+	@Autowired
+    private IStreamProxyService streamProxyService;
+
+	@Override
+	public void onApplicationEvent(ZLMOfflineEvent event) {
+		
+		if (logger.isDebugEnabled()) {
+			logger.debug("ZLM离线事件触发,ID:" + event.getMediaServerId());
+		}
+		// 处理ZLM离线
+		mediaServerService.zlmServerOffline(event.getMediaServerId());
+		streamProxyService.zlmServerOffline(event.getMediaServerId());
+		streamPushService.zlmServerOffline(event.getMediaServerId());
+		// TODO 处理对国标的影响
+	}
+}

+ 11 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEvent.java

@@ -0,0 +1,11 @@
+package com.genersoft.iot.vmp.media.zlm.event;
+
+/**
+ * zlm在线事件
+ */
+public class ZLMOnlineEvent extends ZLMEventAbstract {
+
+	public ZLMOnlineEvent(Object source) {
+		super(source);
+	}
+}

+ 65 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEventListener.java

@@ -0,0 +1,65 @@
+package com.genersoft.iot.vmp.media.zlm.event;
+
+import com.genersoft.iot.vmp.conf.SipConfig;
+import com.genersoft.iot.vmp.conf.UserSetup;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamProxyService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import com.genersoft.iot.vmp.utils.redis.RedisUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationListener;
+import org.springframework.stereotype.Component;
+
+import java.text.SimpleDateFormat;
+
+/**
+ * @description: 在线事件监听器,监听到离线后,修改设备离在线状态。 设备在线有两个来源:
+ *               1、设备主动注销,发送注销指令
+ *               2、设备未知原因离线,心跳超时
+ * @author: swwheihei
+ * @date: 2020年5月6日 下午1:51:23
+ */
+@Component
+public class ZLMOnlineEventListener implements ApplicationListener<ZLMOnlineEvent> {
+	
+	private final static Logger logger = LoggerFactory.getLogger(ZLMOnlineEventListener.class);
+
+	@Autowired
+	private IVideoManagerStorager storager;
+	
+	@Autowired
+    private RedisUtil redis;
+
+	@Autowired
+    private SipConfig sipConfig;
+
+	@Autowired
+    private UserSetup userSetup;
+
+	@Autowired
+	private IMediaServerService mediaServerService;
+
+	@Autowired
+	private IStreamPushService streamPushService;
+
+	@Autowired
+	private IStreamProxyService streamProxyService;
+
+	private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+	@Override
+	public void onApplicationEvent(ZLMOnlineEvent event) {
+		
+		if (logger.isDebugEnabled()) {
+			logger.debug("ZLM上线事件触发,ID:" + event.getMediaServerId());
+		}
+		streamPushService.zlmServerOnline(event.getMediaServerId());
+		streamProxyService.zlmServerOnline(event.getMediaServerId());
+
+
+
+	}
+}

+ 4 - 2
src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java

@@ -78,10 +78,10 @@ public interface IStreamProxyService {
 
     /**
      * 新的节点加入
-     * @param zlmServerConfig
+     * @param mediaServerId
      * @return
      */
-    void zlmServerOnline(ZLMServerConfig zlmServerConfig);
+    void zlmServerOnline(String mediaServerId);
 
     /**
      * 节点离线
@@ -89,4 +89,6 @@ public interface IStreamProxyService {
      * @return
      */
     void zlmServerOffline(String mediaServerId);
+
+    void clean();
 }

+ 4 - 2
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java

@@ -34,6 +34,7 @@ public interface IStreamPushService {
      * @return
      */
     PageInfo<StreamPushItem> getPushList(Integer page, Integer count);
+    List<StreamPushItem> getPushList(String mediaSererId);
 
     StreamPushItem transform(MediaItem item);
 
@@ -49,10 +50,10 @@ public interface IStreamPushService {
 
     /**
      * 新的节点加入
-     * @param zlmServerConfig
+     * @param mediaServerId
      * @return
      */
-    void zlmServerOnline(ZLMServerConfig zlmServerConfig);
+    void zlmServerOnline(String mediaServerId);
 
     /**
      * 节点离线
@@ -61,4 +62,5 @@ public interface IStreamPushService {
      */
     void zlmServerOffline(String mediaServerId);
 
+    void clean();
 }

+ 18 - 10
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java

@@ -4,10 +4,10 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.VideoManagerConstants;
-import com.genersoft.iot.vmp.conf.MediaConfig;
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetup;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.session.SsrcConfig;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
@@ -70,6 +70,9 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
     @Autowired
     private RedisUtil redisUtil;
 
+    @Autowired
+    private EventPublisher publisher;
+
     @Autowired
     JedisUtil jedisUtil;
 
@@ -312,8 +315,6 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
         return mediaServerMapper.update(mediaSerItem);
     }
 
-
-
     /**
      * 处理zlm上线
      * @param zlmServerConfig zlm上线携带的参数
@@ -353,28 +354,31 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
         if (serverItem.getRtpProxyPort() == 0) {
             serverItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort());
         }
-        if (StringUtils.isEmpty(serverItem.getId())) {
-            serverItem.setId(zlmServerConfig.getGeneralMediaServerId());
-        }
         serverItem.setStatus(true);
+
         if (StringUtils.isEmpty(serverItem.getId())) {
             serverItem.setId(zlmServerConfig.getGeneralMediaServerId());
             mediaServerMapper.updateByHostAndPort(serverItem);
         }else {
             mediaServerMapper.update(serverItem);
         }
-        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + serverItem.getId();
+        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + zlmServerConfig.getGeneralMediaServerId();
         if (redisUtil.get(key) == null) {
-            SsrcConfig ssrcConfig = new SsrcConfig(serverItem.getId(), null, sipConfig.getDomain());
+            SsrcConfig ssrcConfig = new SsrcConfig(zlmServerConfig.getGeneralMediaServerId(), null, sipConfig.getDomain());
             serverItem.setSsrcConfig(ssrcConfig);
-            redisUtil.set(key, serverItem);
+        }else {
+            MediaServerItem mediaServerItemInRedis = (MediaServerItem)redisUtil.get(key);
+            serverItem.setSsrcConfig(mediaServerItemInRedis.getSsrcConfig());
         }
-
+        redisUtil.set(key, serverItem);
         resetOnlineServerItem(serverItem);
         updateMediaServerKeepalive(serverItem.getId(), null);
         setZLMConfig(serverItem);
+        publisher.zlmOnlineEventPublish(serverItem.getId());
+
     }
 
+
     @Override
     public void zlmServerOffline(String mediaServerId) {
         delete(mediaServerId);
@@ -567,6 +571,10 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
     @Override
     public void updateMediaServerKeepalive(String mediaServerId, JSONObject data) {
         MediaServerItem mediaServerItem = getOne(mediaServerId);
+        if (mediaServerItem == null) {
+            logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息");
+            return;
+        }
         String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetup.getServerId() + "_" + mediaServerId;
         int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2;
         redisUtil.set(key, data, hookAliveInterval);

+ 35 - 7
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java

@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service.impl;
 
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.conf.UserSetup;
 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
@@ -28,8 +29,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
 
 /**
  * 视频代理业务
@@ -54,6 +54,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
 
+    @Autowired
+    private UserSetup userSetup;
+
     @Autowired
     private GbStreamMapper gbStreamMapper;
 
@@ -160,6 +163,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
         }else {
             mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
         }
+        if (mediaServerItem == null) {
+            return null;
+        }
         if ("default".equals(param.getType())){
             result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl(),
                     param.isEnable_hls(), param.isEnable_mp4(), param.getRtp_type());
@@ -244,7 +250,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
                 }
             }
         }
-
         return result;
     }
 
@@ -255,18 +260,41 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
     }
 
     @Override
-    public void zlmServerOnline(ZLMServerConfig zlmServerConfig) {
-
+    public void zlmServerOnline(String mediaServerId) {
+        zlmServerOffline(mediaServerId);
     }
 
     @Override
     public void zlmServerOffline(String mediaServerId) {
         // 移除开启了无人观看自动移除的流
+        List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selecAutoRemoveItemByMediaServerId(mediaServerId);
+        if (streamProxyItemList.size() > 0) {
+            gbStreamMapper.batchDel(streamProxyItemList);
+        }
         streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
         // 其他的流设置未启用
         streamProxyMapper.updateStatus(false, mediaServerId);
-        // 移除redis内流的信息
-        redisCatchStorage.removeStream(mediaServerId, "PULL");
+        String type = "PULL";
+
+        // 发送redis消息
+        List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
+        if (streamInfoList.size() > 0) {
+            for (StreamInfo streamInfo : streamInfoList) {
+                JSONObject jsonObject = new JSONObject();
+                jsonObject.put("serverId", userSetup.getServerId());
+                jsonObject.put("app", streamInfo.getApp());
+                jsonObject.put("stream", streamInfo.getStreamId());
+                jsonObject.put("register", false);
+                jsonObject.put("mediaServerId", mediaServerId);
+                redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
+                // 移除redis内流的信息
+                redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId());
+            }
+        }
+    }
+
+    @Override
+    public void clean() {
 
     }
 }

+ 88 - 10
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java

@@ -3,11 +3,15 @@ package com.genersoft.iot.vmp.service.impl;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
+import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.conf.UserSetup;
 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
+import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.OriginType;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IStreamPushService;
@@ -20,10 +24,7 @@ import com.github.pagehelper.PageInfo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 @Service
 public class StreamPushServiceImpl implements IStreamPushService {
@@ -43,6 +44,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
 
+    @Autowired
+    private UserSetup userSetup;
+
     @Autowired
     private IMediaServerService mediaServerService;
 
@@ -56,7 +60,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
         for (MediaItem item : mediaItems) {
 
             // 不保存国标推理以及拉流代理的流
-            if (item.getOriginType() == 1 || item.getOriginType() == 2 || item.getOriginType() == 8) {
+            if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
+                    || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
+                    || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
                 String key = item.getApp() + "_" + item.getStream();
                 StreamPushItem streamPushItem = result.get(key);
                 if (streamPushItem == null) {
@@ -97,6 +103,11 @@ public class StreamPushServiceImpl implements IStreamPushService {
         return new PageInfo<>(all);
     }
 
+    @Override
+    public List<StreamPushItem> getPushList(String mediaServerId) {
+        return streamPushMapper.selectAllByMediaServerId(mediaServerId);
+    }
+
     @Override
     public boolean saveToGB(GbStream stream) {
         stream.setStreamType("push");
@@ -137,17 +148,84 @@ public class StreamPushServiceImpl implements IStreamPushService {
     }
 
     @Override
-    public void zlmServerOnline(ZLMServerConfig zlmServerConfig) {
-        // 似乎没啥需要做的
+    public void zlmServerOnline(String mediaServerId) {
+        // 同步zlm推流信息
+        MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
+        if (mediaServerItem == null) {
+            return;
+        }
+        List<StreamPushItem> pushList = getPushList(mediaServerId);
+        if (pushList.size() > 0) {
+            Map<String, StreamPushItem> pushItemMap = new HashMap<>();
+            for (StreamPushItem streamPushItem : pushList) {
+                pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
+            }
+            zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
+                if (mediaList == null) return;
+                String dataStr = mediaList.getString("data");
+
+                Integer code = mediaList.getInteger("code");
+                List<StreamPushItem> streamPushItems = null;
+                if (code == 0 ) {
+                    if (dataStr != null) {
+                        streamPushItems = handleJSON(dataStr, mediaServerItem);
+                    }
+                }
+
+                if (streamPushItems != null) {
+                    for (StreamPushItem streamPushItem : streamPushItems) {
+                        pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
+                    }
+                }
+                Collection<StreamPushItem> offlinePushItems = pushItemMap.values();
+                if (offlinePushItems.size() > 0) {
+                    String type = "PUSH";
+                    streamPushMapper.delAll(new ArrayList<>(offlinePushItems));
+                    for (StreamPushItem offlinePushItem : offlinePushItems) {
+                        JSONObject jsonObject = new JSONObject();
+                        jsonObject.put("serverId", userSetup.getServerId());
+                        jsonObject.put("app", offlinePushItem.getApp());
+                        jsonObject.put("stream", offlinePushItem.getStream());
+                        jsonObject.put("register", false);
+                        jsonObject.put("mediaServerId", mediaServerId);
+                        redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
+                        // 移除redis内流的信息
+                        redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlinePushItem.getApp(), offlinePushItem.getStream());
+                    }
+                }
+            }));
+        }
     }
 
     @Override
     public void zlmServerOffline(String mediaServerId) {
-        // 移除没有serverId的推流
+        List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerId(mediaServerId);
+        // 移除没有GBId的推流
         streamPushMapper.deleteWithoutGBId(mediaServerId);
+        gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
         // 其他的流设置未启用
         gbStreamMapper.updateStatusByMediaServerId(mediaServerId, false);
-        // 移除redis内流的信息
-        redisCatchStorage.removeStream(mediaServerId, "PUSH");
+        // 发送流停止消息
+        String type = "PUSH";
+        // 发送redis消息
+        List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
+        if (streamInfoList.size() > 0) {
+            for (StreamInfo streamInfo : streamInfoList) {
+                JSONObject jsonObject = new JSONObject();
+                jsonObject.put("serverId", userSetup.getServerId());
+                jsonObject.put("app", streamInfo.getApp());
+                jsonObject.put("stream", streamInfo.getStreamId());
+                jsonObject.put("register", false);
+                jsonObject.put("mediaServerId", mediaServerId);
+                redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
+                // 移除redis内流的信息
+                redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId());
+            }
+        }
+    }
+
+    @Override
+    public void clean() {
+
     }
 }

+ 4 - 2
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java

@@ -140,11 +140,11 @@ public interface IRedisCatchStorage {
 
     /**
      * 移除流信息从redis
-     * @param mediaServerItem
+     * @param mediaServerId
      * @param app
      * @param streamId
      */
-    void removeStream(MediaServerItem mediaServerItem, String type, String app, String streamId);
+    void removeStream(String mediaServerId, String type, String app, String streamId);
 
 
     /**
@@ -167,4 +167,6 @@ public interface IRedisCatchStorage {
      * @return
      */
     ThirdPartyGB queryMemberNoGBId(String queryKey);
+
+    List<StreamInfo> getStreams(String mediaServerId, String pull);
 }

+ 14 - 0
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java

@@ -65,4 +65,18 @@ public interface GbStreamMapper {
             "SET status=${status} " +
             "WHERE mediaServerId=#{mediaServerId} ")
     void updateStatusByMediaServerId(String mediaServerId, boolean status);
+
+    @Select("SELECT * FROM gb_stream WHERE mediaServerId=#{mediaServerId}")
+    void delByMediaServerId(String mediaServerId);
+
+    @Delete("DELETE FROM gb_stream WHERE streamType=#{type} AND gbId=NULL AND mediaServerId=#{mediaServerId}")
+    void deleteWithoutGBId(String type, String mediaServerId);
+
+    @Delete("<script> "+
+            "DELETE FROM gb_stream where " +
+            "<foreach collection='streamProxyItemList' item='item' separator='or'>" +
+            "(app=#{item.app} and stream=#{item.stream}) " +
+            "</foreach>" +
+            "</script>")
+    void batchDel(List<StreamProxyItem> streamProxyItemList);
 }

+ 4 - 1
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java

@@ -62,6 +62,9 @@ public interface StreamProxyMapper {
             "WHERE mediaServerId=#{mediaServerId}")
     void updateStatus(boolean status, String mediaServerId);
 
-    @Delete("DELETE FROM stream_proxy WHERE mediaServerId=#{mediaServerId}")
+    @Delete("DELETE FROM stream_proxy WHERE enable_remove_none_reader=true AND mediaServerId=#{mediaServerId}")
     void deleteAutoRemoveItemByMediaServerId(String mediaServerId);
+
+    @Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable_remove_none_reader=true AND st.mediaServerId=#{mediaServerId} order by st.createTime desc")
+    List<StreamProxyItem> selecAutoRemoveItemByMediaServerId(String mediaServerId);
 }

+ 12 - 0
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java

@@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import org.apache.ibatis.annotations.*;
 import org.springframework.stereotype.Repository;
 
+import java.util.Collection;
 import java.util.List;
 
 @Mapper
@@ -31,6 +32,14 @@ public interface StreamPushMapper {
     @Delete("DELETE FROM stream_push WHERE app=#{app} AND stream=#{stream}")
     int del(String app, String stream);
 
+    @Delete("<script> "+
+            "DELETE FROM stream_push where " +
+            "<foreach collection='streamPushItems' item='item' separator='or'>" +
+            "(app=#{item.app} and stream=#{item.stream}) " +
+            "</foreach>" +
+            "</script>")
+    int delAll(List<StreamPushItem> streamPushItems);
+
     @Select("SELECT st.*, pgs.gbId, pgs.status, pgs.name, pgs.longitude, pgs.latitude FROM stream_push st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream")
     List<StreamPushItem> selectAll();
 
@@ -56,4 +65,7 @@ public interface StreamPushMapper {
     @Delete("DELETE FROM stream_push WHERE mediaServerId=#{mediaServerId}")
     void deleteWithoutGBId(String mediaServerId);
 
+    @Select("SELECT * FROM stream_push WHERE mediaServerId=#{mediaServerId}")
+    List<StreamPushItem> selectAllByMediaServerId(String mediaServerId);
+
 }

+ 14 - 2
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -338,8 +338,8 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
     }
 
     @Override
-    public void removeStream(MediaServerItem mediaServerItem, String type, String app, String streamId) {
-        String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_"  + app + "_" + streamId + "_" + mediaServerItem.getId();
+    public void removeStream(String mediaServerId, String type, String app, String streamId) {
+        String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_"  + app + "_" + streamId + "_" + mediaServerId;
         redis.del(key);
     }
 
@@ -365,4 +365,16 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
             redis.del((String) stream);
         }
     }
+
+    @Override
+    public List<StreamInfo> getStreams(String mediaServerId, String type) {
+        List<StreamInfo> result = new ArrayList<>();
+        String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_*_*_" + mediaServerId;
+        List<Object> streams = redis.scan(key);
+        for (Object stream : streams) {
+            StreamInfo streamInfo = (StreamInfo)redis.get((String) stream);
+            result.add(streamInfo);
+        }
+        return result;
+    }
 }