فهرست منبع

根据redis消息更新推流列表

jiang 3 سال پیش
والد
کامیت
8f77d0c25c

+ 4 - 0
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java

@@ -91,6 +91,10 @@ public class VideoManagerConstants {
 	 * 接收推流设备的GPS变化通知
 	 */
 	public static final String VM_MSG_PUSH_STREAM_STATUS_CHANGE = "VM_MSG_PUSH_STREAM_STATUS_CHANGE";
+	/**
+	 * 接收推流设备列表更新变化通知
+	 */
+	public static final String VM_MSG_PUSH_STREAM_LIST_CHANGE = "VM_MSG_PUSH_STREAM_LIST_CHANGE";
 
 	/**
 	 * redis 消息通知设备推流到平台

+ 4 - 0
src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java

@@ -43,6 +43,9 @@ public class RedisConfig extends CachingConfigurerSupport {
 	@Autowired
 	private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
 
+	@Autowired
+	private RedisPushStreamListMsgListener redisPushStreamListMsgListener;
+
 	@Bean
 	public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
 		RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
@@ -80,6 +83,7 @@ public class RedisConfig extends CachingConfigurerSupport {
 		container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
 		container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
 		container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
+		container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
         return container;
     }
 

+ 8 - 0
src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java

@@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.service;
 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.github.pagehelper.PageInfo;
 
 import java.util.List;
@@ -45,4 +46,11 @@ public interface IGbStreamService {
 
     void sendCatalogMsg(GbStream gbStream, String type);
     void sendCatalogMsgs(List<GbStream> gbStreams, String type);
+
+    /**
+     * 修改gbId或name
+     * @param streamPushItemForUpdate
+     * @return
+     */
+    int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate);
 }

+ 6 - 0
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java

@@ -100,4 +100,10 @@ public interface IStreamPushService {
      * 增加推流
      */
     boolean add(StreamPushItem stream);
+
+    /**
+     * 获取全部的app+Streanm 用于判断推流列表是新增还是修改
+     * @return
+     */
+    List<String> getAllAppAndStream();
 }

+ 6 - 2
src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java

@@ -1,10 +1,9 @@
 package com.genersoft.iot.vmp.service.impl;
 
-import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
 import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
 import com.genersoft.iot.vmp.storager.dao.PlatformCatalogMapper;
@@ -183,4 +182,9 @@ public class GbStreamServiceImpl implements IGbStreamService {
             }
         }
     }
+
+    @Override
+    public int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate) {
+        return gbStreamMapper.updateGbIdOrName(streamPushItemForUpdate);
+    }
 }

+ 83 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java

@@ -0,0 +1,83 @@
+package com.genersoft.iot.vmp.service.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.service.IGbStreamService;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.*;
+
+/**
+ * @Auther: JiangFeng
+ * @Date: 2022/8/16 11:32
+ * @Description: 接收redis发送的推流设备列表更新通知
+ */
+@Component
+public class RedisPushStreamListMsgListener implements MessageListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamListMsgListener.class);
+    @Resource
+    private IMediaServerService mediaServerService;
+
+    @Resource
+    private IStreamPushService streamPushService;
+    @Resource
+    private IGbStreamService gbStreamService;
+
+    @Override
+    public void onMessage(Message message, byte[] bytes) {
+        //
+        logger.warn("[REDIS消息-推流设备列表更新]: {}", new String(message.getBody()));
+        List<StreamPushItem> streamPushItems = JSON.parseArray(new String(message.getBody()), StreamPushItem.class);
+        //查询全部的app+stream 用于判断是添加还是修改
+        List<String> allAppAndStream = streamPushService.getAllAppAndStream();
+
+        /**
+         * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表
+         */
+        List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
+        List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
+        for (StreamPushItem streamPushItem : streamPushItems) {
+            String app = streamPushItem.getApp();
+            String stream = streamPushItem.getStream();
+            boolean contains = allAppAndStream.contains(app + stream);
+            //不存在就添加
+            if (!contains) {
+                streamPushItem.setStatus(false);
+                streamPushItem.setStreamType("push");
+                streamPushItem.setCreateTime(DateUtil.getNow());
+                streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
+                streamPushItem.setOriginType(2);
+                streamPushItem.setOriginTypeStr("rtsp_push");
+                streamPushItem.setTotalReaderCount("0");
+                streamPushItemForSave.add(streamPushItem);
+            } else {
+                //存在就只修改 name和gbId
+                streamPushItemForUpdate.add(streamPushItem);
+            }
+        }
+        if (streamPushItemForSave.size() > 0) {
+
+            logger.info("添加{}条",streamPushItemForSave.size());
+            logger.info(JSONObject.toJSONString(streamPushItemForSave));
+            streamPushService.batchAdd(streamPushItemForSave);
+
+        }
+        if(streamPushItemForUpdate.size()>0){
+            logger.info("修改{}条",streamPushItemForUpdate.size());
+            logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
+            gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
+        }
+
+    }
+}

+ 6 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java

@@ -340,6 +340,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
         gbStreamMapper.batchAdd(streamPushItems);
     }
 
+
     @Override
     public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
         // 存储数据到stream_push表
@@ -503,4 +504,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
         }
         return result;
     }
+
+    @Override
+    public List<String> getAllAppAndStream() {
+        return streamPushMapper.getAllAppAndStream();
+    }
 }

+ 10 - 0
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java

@@ -148,4 +148,14 @@ public interface GbStreamMapper {
             "SET mediaServerId=#{mediaServerId}" +
             "WHERE app=#{app} AND stream=#{stream}")
     void updateMediaServer(String app, String stream, String mediaServerId);
+
+    @Update("<script> "+
+                " <foreach collection='list' item='item' index='index' separator=';'>"+
+                    "UPDATE gb_stream " +
+                    " SET name=#{item.name},"+
+                    " gbId=#{item.gbId}"+
+                    " WHERE app=#{item.app} and stream=#{item.stream}"+
+                "</foreach>"+
+            "</script>")
+    int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate);
 }

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java

@@ -168,4 +168,7 @@ public interface StreamPushMapper {
 
     @Update("UPDATE stream_push SET status=0")
     void setAllStreamOffline();
+
+    @Select("SELECT CONCAT(app,stream) FROM gb_stream")
+    List<String> getAllAppAndStream();
 }