Ver Fonte

优化Catalog接收,提升大量设备同时接入性能

648540858 há 1 ano atrás
pai
commit
dd6beba843

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java

@@ -37,7 +37,6 @@ import org.springframework.util.CollectionUtils;
 import org.springframework.util.ObjectUtils;
 
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @author lin
@@ -470,6 +469,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
         if (CollectionUtils.isEmpty(deviceChannelList)) {
             return false;
         }
+        System.out.println("size: " + deviceChannelList.size());
         List<DeviceChannel> allChannels = channelMapper.queryAllChannelsForRefresh(deviceDbId);
         Map<String,DeviceChannel> allChannelMap = new HashMap<>();
         if (!allChannels.isEmpty()) {

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java

@@ -335,12 +335,12 @@ public class DeviceServiceImpl implements IDeviceService {
         try {
             sipCommander.catalogQuery(device, sn, event -> {
                 String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);
-                catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg);
+                catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), sn, errorMsg);
             });
         } catch (SipException | InvalidArgumentException | ParseException e) {
             log.error("[同步通道], 信令发送失败:{}", e.getMessage() );
             String errorMsg = String.format("同步通道失败,信令发送失败: %s", e.getMessage());
-            catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg);
+            catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), sn, errorMsg);
         }
     }
 

+ 53 - 39
src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java

@@ -1,7 +1,5 @@
 package com.genersoft.iot.vmp.gb28181.session;
 
-import com.genersoft.iot.vmp.common.InviteInfo;
-import com.genersoft.iot.vmp.common.VideoManagerConstants;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
 import com.genersoft.iot.vmp.gb28181.service.IGroupService;
@@ -9,16 +7,16 @@ import com.genersoft.iot.vmp.gb28181.service.IRegionService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
-import org.springframework.data.redis.core.Cursor;
 import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.core.ScanOptions;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import java.time.Instant;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.DelayQueue;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
@@ -41,8 +39,12 @@ public class CatalogDataManager implements CommandLineRunner {
 
     private final String key = "VMP_CATALOG_DATA";
 
+    public String buildMapKey(String deviceId, int sn ) {
+        return deviceId + "_" + sn;
+    }
+
     public void addReady(Device device, int sn ) {
-        CatalogData catalogData = dataMap.get(device.getDeviceId());
+        CatalogData catalogData = dataMap.get(buildMapKey(device.getDeviceId(),sn));
         if (catalogData != null) {
             Set<String> redisKeysForChannel = catalogData.getRedisKeysForChannel();
             if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) {
@@ -62,19 +64,19 @@ public class CatalogDataManager implements CommandLineRunner {
                     redisTemplate.opsForHash().delete(key, deleteKey);
                 }
             }
-            dataMap.remove(device.getDeviceId());
+            dataMap.remove(buildMapKey(device.getDeviceId(),sn));
         }
         catalogData = new CatalogData();
         catalogData.setDevice(device);
         catalogData.setSn(sn);
         catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
         catalogData.setTime(Instant.now());
-        dataMap.put(device.getDeviceId(), catalogData);
+        dataMap.put(buildMapKey(device.getDeviceId(),sn), catalogData);
     }
 
     public void put(String deviceId, int sn, int total, Device device, List<DeviceChannel> deviceChannelList,
                     List<Region> regionList, List<Group> groupList) {
-        CatalogData catalogData = dataMap.get(device.getDeviceId());
+        CatalogData catalogData = dataMap.get(buildMapKey(device.getDeviceId(),sn));
         if (catalogData == null ) {
             log.warn("[缓存-Catalog] 未找到缓存对象,可能已经结束");
             return;
@@ -108,9 +110,9 @@ public class CatalogDataManager implements CommandLineRunner {
         }
     }
 
-    public List<DeviceChannel> getDeviceChannelList(String deviceId) {
+    public List<DeviceChannel> getDeviceChannelList(String deviceId, int sn) {
         List<DeviceChannel> result = new ArrayList<>();
-        CatalogData catalogData = dataMap.get(deviceId);
+        CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn));
         if (catalogData == null ) {
             log.warn("[Redis-Catalog] 未找到缓存对象,可能已经结束");
             return result;
@@ -124,9 +126,9 @@ public class CatalogDataManager implements CommandLineRunner {
         return result;
     }
 
-    public List<Region> getRegionList(String deviceId) {
+    public List<Region> getRegionList(String deviceId, int sn) {
         List<Region> result = new ArrayList<>();
-        CatalogData catalogData = dataMap.get(deviceId);
+        CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn));
         if (catalogData == null ) {
             log.warn("[Redis-Catalog] 未找到缓存对象,可能已经结束");
             return result;
@@ -140,9 +142,9 @@ public class CatalogDataManager implements CommandLineRunner {
         return result;
     }
 
-    public List<Group> getGroupList(String deviceId) {
+    public List<Group> getGroupList(String deviceId, int sn) {
         List<Group> result = new ArrayList<>();
-        CatalogData catalogData = dataMap.get(deviceId);
+        CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn));
         if (catalogData == null ) {
             log.warn("[Redis-Catalog] 未找到缓存对象,可能已经结束");
             return result;
@@ -157,28 +159,40 @@ public class CatalogDataManager implements CommandLineRunner {
     }
 
     public SyncStatus getSyncStatus(String deviceId) {
-        CatalogData catalogData = dataMap.get(deviceId);
-        if (catalogData == null) {
+        if (dataMap.isEmpty()) {
             return null;
         }
-        SyncStatus syncStatus = new SyncStatus();
-        syncStatus.setCurrent(catalogData.getRedisKeysForChannel().size());
-        syncStatus.setTotal(catalogData.getTotal());
-        syncStatus.setErrorMsg(catalogData.getErrorMsg());
-        if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) {
-            syncStatus.setSyncIng(false);
-        }else {
-            syncStatus.setSyncIng(true);
+        Set<String> keySet = dataMap.keySet();
+        for (String key : keySet) {
+            CatalogData catalogData = dataMap.get(key);
+            if (catalogData != null && deviceId.equals(catalogData.getDevice().getDeviceId())) {
+                SyncStatus syncStatus = new SyncStatus();
+                syncStatus.setCurrent(catalogData.getRedisKeysForChannel().size());
+                syncStatus.setTotal(catalogData.getTotal());
+                syncStatus.setErrorMsg(catalogData.getErrorMsg());
+                if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) {
+                    syncStatus.setSyncIng(false);
+                }else {
+                    syncStatus.setSyncIng(true);
+                }
+                return syncStatus;
+            }
         }
-        return syncStatus;
+        return null;
     }
 
     public boolean isSyncRunning(String deviceId) {
-        CatalogData catalogData = dataMap.get(deviceId);
-        if (catalogData == null) {
+        if (dataMap.isEmpty()) {
             return false;
         }
-        return !catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end);
+        Set<String> keySet = dataMap.keySet();
+        for (String key : keySet) {
+            CatalogData catalogData = dataMap.get(key);
+            if (catalogData != null && deviceId.equals(catalogData.getDevice().getDeviceId())) {
+                return !catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end);
+            }
+        }
+        return false;
     }
 
     @Override
@@ -202,17 +216,17 @@ public class CatalogDataManager implements CommandLineRunner {
                 if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
                     String deviceId = catalogData.getDevice().getDeviceId();
                     int sn = catalogData.getSn();
-                    List<DeviceChannel> deviceChannelList = getDeviceChannelList(deviceId);
+                    List<DeviceChannel> deviceChannelList = getDeviceChannelList(deviceId, sn);
                     if (catalogData.getTotal() == deviceChannelList.size()) {
                         deviceChannelService.resetChannels(catalogData.getDevice().getId(), deviceChannelList);
                     }else {
                         deviceChannelService.updateChannels(catalogData.getDevice(), deviceChannelList);
                     }
-                    List<Region> regionList = getRegionList(deviceId);
+                    List<Region> regionList = getRegionList(deviceId, sn);
                     if ( regionList!= null && !regionList.isEmpty()) {
                         regionService.batchAdd(regionList);
                     }
-                    List<Group> groupList = getGroupList(deviceId);
+                    List<Group> groupList = getGroupList(deviceId, sn);
                     if (groupList != null && !groupList.isEmpty()) {
                         groupService.batchAdd(groupList);
                     }
@@ -248,8 +262,8 @@ public class CatalogDataManager implements CommandLineRunner {
     }
 
 
-    public void setChannelSyncEnd(String deviceId, String errorMsg) {
-        CatalogData catalogData = dataMap.get(deviceId);
+    public void setChannelSyncEnd(String deviceId, int sn, String errorMsg) {
+        CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn));
         if (catalogData == null) {
             return;
         }
@@ -258,16 +272,16 @@ public class CatalogDataManager implements CommandLineRunner {
         catalogData.setTime(Instant.now());
     }
 
-    public int size(String deviceId) {
-        CatalogData catalogData = dataMap.get(deviceId);
+    public int size(String deviceId, int sn) {
+        CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn));
         if (catalogData == null) {
             return 0;
         }
         return catalogData.getRedisKeysForChannel().size();
     }
 
-    public int sumNum(String deviceId) {
-        CatalogData catalogData = dataMap.get(deviceId);
+    public int sumNum(String deviceId, int sn) {
+        CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn));
         if (catalogData == null) {
             return 0;
         }

+ 16 - 14
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java

@@ -96,6 +96,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
                 continue;
             }
             RequestEvent evt = take.getEvt();
+            int sn = 0;
             // 全局异常捕获,保证下一条可以得到处理
             try {
                 Element rootElement = null;
@@ -118,7 +119,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
                     log.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId());
                     // 数据已经完整接收
                     deviceChannelService.cleanChannelsForDevice(take.getDevice().getId());
-                    catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
+                    catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), sn, null);
                 } else {
                     Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
                     if (deviceListIterator != null) {
@@ -132,6 +133,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
                             if (channelDeviceElement == null) {
                                 continue;
                             }
+                            // 从xml解析内容到 DeviceChannel 对象
                             DeviceChannel channel = DeviceChannel.decode(itemDevice);
                             if (channel.getDeviceId() == null) {
                                 log.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
@@ -158,25 +160,25 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
                             }
                             channelList.add(channel);
                         }
-                        int sn = Integer.parseInt(snElement.getText());
+                        sn = Integer.parseInt(snElement.getText());
                         catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(),
                                 channelList, regionList, groupList);
-                        log.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.size(take.getDevice().getDeviceId()), sumNum);
+                        log.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.size(take.getDevice().getDeviceId(), sn), sumNum);
                     }
                 }
             } catch (Exception e) {
                 log.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest());
                 log.error("[收到通道] 异常内容: ", e);
             } finally {
-                if (catalogDataCatch.size(take.getDevice().getDeviceId()) == catalogDataCatch.sumNum(take.getDevice().getDeviceId())) {
+                if (catalogDataCatch.size(take.getDevice().getDeviceId(), sn) == catalogDataCatch.sumNum(take.getDevice().getDeviceId(), sn)) {
                     // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理,
                     // 目前支持设备通道上线通知时和设备上线时向上级通知
-                    boolean resetChannelsResult = saveData(take.getDevice());
+                    boolean resetChannelsResult = saveData(take.getDevice(), sn);
                     if (!resetChannelsResult) {
-                        String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(take.getDevice().getDeviceId()) + "条,已接收" + catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size() + "条";
-                        catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg);
+                        String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(take.getDevice().getDeviceId(), sn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId(), sn).size() + "条";
+                        catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), sn, errorMsg);
                     } else {
-                        catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
+                        catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), sn, null);
                     }
                 }
             }
@@ -184,20 +186,20 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
     }
 
     @Transactional
-    public boolean saveData(Device device) {
+    public boolean saveData(Device device, int sn) {
 
         boolean result = true;
-        List<DeviceChannel> deviceChannelList = catalogDataCatch.getDeviceChannelList(device.getDeviceId());
+        List<DeviceChannel> deviceChannelList = catalogDataCatch.getDeviceChannelList(device.getDeviceId(), sn);
         if (deviceChannelList != null && !deviceChannelList.isEmpty()) {
             result &= deviceChannelService.resetChannels(device.getId(), deviceChannelList);
         }
 
-        List<Region> regionList = catalogDataCatch.getRegionList(device.getDeviceId());
+        List<Region> regionList = catalogDataCatch.getRegionList(device.getDeviceId(), sn);
         if ( regionList!= null && !regionList.isEmpty()) {
             result &= regionService.batchAdd(regionList);
         }
 
-        List<Group> groupList = catalogDataCatch.getGroupList(device.getDeviceId());
+        List<Group> groupList = catalogDataCatch.getGroupList(device.getDeviceId(), sn);
         if (groupList != null && !groupList.isEmpty()) {
             result &= groupService.batchAdd(groupList);
         }
@@ -221,7 +223,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
         catalogDataCatch.addReady(device, sn);
     }
 
-    public void setChannelSyncEnd(String deviceId, String errorMsg) {
-        catalogDataCatch.setChannelSyncEnd(deviceId, errorMsg);
+    public void setChannelSyncEnd(String deviceId, int sn, String errorMsg) {
+        catalogDataCatch.setChannelSyncEnd(deviceId, sn, errorMsg);
     }
 }