Browse Source

处理收到redis推动的推流设备信息内容重复的问题

648540858 1 year ago
parent
commit
e1d476a54a

+ 4 - 0
src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java

@@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.github.pagehelper.PageInfo;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * 级联国标平台关联流业务接口
@@ -71,4 +72,7 @@ public interface IGbStreamService {
     void delAllPlatformInfo(String platformId, String catalogId);
 
     List<GbStream> getGbChannelWithGbid(String gbId);
+
+    Map<String, GbStream> getAllGBId();
+
 }

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java

@@ -115,4 +115,7 @@ public interface IStreamPushService {
      */
     ResourceBaseInfo getOverview();
 
+    Map<String, StreamPushItem> getAllAppAndStreamMap();
+
+
 }

+ 6 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java

@@ -19,11 +19,11 @@ import org.springframework.jdbc.datasource.DataSourceTransactionManager;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.TransactionDefinition;
 import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.ObjectUtils;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 @Service
 @DS("master")
@@ -268,4 +268,9 @@ public class GbStreamServiceImpl implements IGbStreamService {
     public List<GbStream> getGbChannelWithGbid(String gbId) {
         return gbStreamMapper.selectByGBId(gbId);
     }
+
+    @Override
+    public Map<String, GbStream> getAllGBId() {
+        return gbStreamMapper.getAllGBId();
+    }
 }

+ 5 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java

@@ -548,4 +548,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
 
         return new ResourceBaseInfo(total, online);
     }
+
+    @Override
+    public Map<String, StreamPushItem> getAllAppAndStreamMap() {
+        return streamPushMapper.getAllAppAndStreamMap();
+    }
 }

+ 15 - 6
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java

@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service.redisMsg;
 
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONObject;
+import com.genersoft.iot.vmp.gb28181.bean.GbStream;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.IGbStreamService;
 import com.genersoft.iot.vmp.service.IMediaServerService;
@@ -19,6 +20,7 @@ import org.springframework.stereotype.Component;
 import javax.annotation.Resource;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
@@ -57,7 +59,8 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
                     try {
                         List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
                         //查询全部的app+stream 用于判断是添加还是修改
-                        List<String> allAppAndStream = streamPushService.getAllAppAndStream();
+                        Map<String, StreamPushItem> allAppAndStream = streamPushService.getAllAppAndStreamMap();
+                        Map<String, GbStream> allGBId = gbStreamService.getAllGBId();
 
                         /**
                          * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表
@@ -67,9 +70,15 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
                         for (StreamPushItem streamPushItem : streamPushItems) {
                             String app = streamPushItem.getApp();
                             String stream = streamPushItem.getStream();
-                            boolean contains = allAppAndStream.contains(app + stream);
+                            boolean contains = allAppAndStream.containsKey(app + stream);
                             //不存在就添加
                             if (!contains) {
+                                if (allGBId.containsKey(streamPushItem.getGbId())) {
+                                    GbStream gbStream = allGBId.get(streamPushItem.getGbId());
+                                    logger.warn("[REDIS消息-推流设备列表更新] 国标编号重复: {}, 已分配给{}/{}",
+                                            streamPushItem.getGbId(), gbStream.getApp(), gbStream.getStream());
+                                    continue;
+                                }
                                 streamPushItem.setStreamType("push");
                                 streamPushItem.setCreateTime(DateUtil.getNow());
                                 streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
@@ -77,25 +86,25 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
                                 streamPushItem.setOriginTypeStr("rtsp_push");
                                 streamPushItem.setTotalReaderCount("0");
                                 streamPushItemForSave.add(streamPushItem);
+                                allGBId.put(streamPushItem.getGbId(), streamPushItem);
                             } else {
                                 //存在就只修改 name和gbId
                                 streamPushItemForUpdate.add(streamPushItem);
                             }
                         }
-                        if (streamPushItemForSave.size() > 0) {
-
+                        if (!streamPushItemForSave.isEmpty()) {
                             logger.info("添加{}条",streamPushItemForSave.size());
                             logger.info(JSONObject.toJSONString(streamPushItemForSave));
                             streamPushService.batchAdd(streamPushItemForSave);
 
                         }
-                        if(streamPushItemForUpdate.size()>0){
+                        if(!streamPushItemForUpdate.isEmpty()){
                             logger.info("修改{}条",streamPushItemForUpdate.size());
                             logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
                             gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
                         }
                     }catch (Exception e) {
-                        logger.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
+                        logger.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", new String(message.getBody()));
                         logger.error("[REDIS消息-推流设备列表更新] 异常内容: ", e);
                     }
                 }

+ 4 - 0
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java

@@ -10,6 +10,7 @@ import org.apache.ibatis.annotations.Param;
 import org.springframework.stereotype.Repository;
 
 import java.util.List;
+import java.util.Map;
 
 @Mapper
 @Repository
@@ -170,4 +171,7 @@ public interface GbStreamMapper {
     @Select("SELECT status FROM wvp_stream_push WHERE app=#{app} AND stream=#{stream}")
     Boolean selectStatusForPush(@Param("app") String app, @Param("stream") String stream);
 
+    @MapKey("gbId")
+    @Select("SELECT * from wvp_gb_stream")
+    Map<String, GbStream> getAllGBId();
 }

+ 9 - 0
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java

@@ -7,6 +7,7 @@ import org.apache.ibatis.annotations.*;
 import org.springframework.stereotype.Repository;
 
 import java.util.List;
+import java.util.Map;
 
 @Mapper
 @Repository
@@ -195,4 +196,12 @@ public interface StreamPushMapper {
             "</foreach>" +
             "</script>")
     List<StreamPushItem> getListIn(List<StreamPushItem> streamPushItems);
+
+    @MapKey("vhost")
+    @Select("SELECT CONCAT(wsp.app, wsp.stream) as vhost, wsp.app, wsp.stream, wgs.gb_id, wgs.name " +
+            " from wvp_stream_push wsp " +
+            " left join wvp_gb_stream  wgs  on wgs.app = wsp.app and wgs.stream = wsp.stream")
+    Map<String, StreamPushItem> getAllAppAndStreamMap();
+
+
 }

+ 4 - 3
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java

@@ -3,10 +3,9 @@ package com.genersoft.iot.vmp.vmanager.gb28181.gbStream;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.conf.security.JwtUtils;
 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.service.IGbStreamService;
 import com.genersoft.iot.vmp.service.IPlatformService;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.service.IStreamPushService;
 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
 import com.genersoft.iot.vmp.vmanager.gb28181.gbStream.bean.GbStreamParam;
 import com.github.pagehelper.PageInfo;
@@ -20,7 +19,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.util.ObjectUtils;
 import org.springframework.web.bind.annotation.*;
 
-import java.util.ArrayList;
 import java.util.List;
 
 @Tag(name  = "视频流关联到级联平台")
@@ -34,6 +32,9 @@ public class GbStreamController {
     @Autowired
     private IGbStreamService gbStreamService;
 
+    @Autowired
+    private IStreamPushService service;
+
     @Autowired
     private IPlatformService platformService;