Quellcode durchsuchen

优化通道刷新的时接收下级消息的效率

648540858 vor 1 Jahr
Ursprung
Commit
d2fc2df77b

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/conf/MediaStatusTimerTask.java

@@ -8,7 +8,7 @@ import org.springframework.scheduling.annotation.Scheduled;
 public class MediaStatusTimerTask {
 
 
-    @Scheduled(fixedRate = 2 * 1000)   //每3秒执行一次
+//    @Scheduled(fixedRate = 2 * 1000)   //每3秒执行一次
     public void execute(){
 
     }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/conf/WVPTimerTask.java

@@ -19,7 +19,7 @@ public class WVPTimerTask {
     @Autowired
     private SipConfig sipConfig;
 
-    @Scheduled(fixedRate = 2 * 1000)   //每3秒执行一次
+    @Scheduled(fixedDelay = 2 * 1000)   //每3秒执行一次
     public void execute(){
         JSONObject jsonObject = new JSONObject();
         jsonObject.put("ip", sipConfig.getShowIp());

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java

@@ -135,7 +135,7 @@ public class CatalogDataCatch {
         return !catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end);
     }
 
-    @Scheduled(fixedRate = 5 * 1000)   //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
+    @Scheduled(fixedDelay = 5 * 1000)   //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
     private void timerTask(){
         Set<String> keys = data.keySet();
 

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java

@@ -62,7 +62,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 		taskQueue.offer(new HandlerCatchData(evt, null, null));
 	}
 
-	@Scheduled(fixedRate = 400)   //每400毫秒执行一次
+	@Scheduled(fixedDelay = 400)   //每400毫秒执行一次
 	public void executeTaskQueue(){
 		if (taskQueue.isEmpty()) {
 			return;

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java

@@ -61,7 +61,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
 		taskQueue.offer(new HandlerCatchData(evt, null, null));
 	}
 
-	@Scheduled(fixedRate = 200) //每200毫秒执行一次
+	@Scheduled(fixedDelay = 200) //每200毫秒执行一次
 	public void executeTaskQueue() {
 		if (taskQueue.isEmpty()) {
 			return;

+ 104 - 89
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java

@@ -2,13 +2,13 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respon
 
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
 import com.genersoft.iot.vmp.gb28181.service.IGroupService;
 import com.genersoft.iot.vmp.gb28181.service.IRegionService;
 import com.genersoft.iot.vmp.gb28181.session.CatalogDataCatch;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
-import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
 import gov.nist.javax.sip.message.SIPRequest;
 import lombok.extern.slf4j.Slf4j;
 import org.dom4j.DocumentException;
@@ -16,6 +16,7 @@ import org.dom4j.Element;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
@@ -80,100 +81,114 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
         } catch (SipException | InvalidArgumentException | ParseException e) {
             log.error("[命令发送失败] 目录查询回复: {}", e.getMessage());
         }
-        // 已经开启消息处理则跳过
-        if (processing.compareAndSet(false, true)) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    // 全局异常捕获,保证下一条可以得到处理
-                    try {
-                        HandlerCatchData take = taskQueue.poll();
-                        Element rootElement = null;
-                        try {
-                            rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset());
-                        } catch (DocumentException e) {
-                            log.error("[xml解析] 失败: ", e);
-                            continue;
-                        }
-                        if (rootElement == null) {
-                            log.warn("[ 收到通道 ] content cannot be null, {}", evt.getRequest());
-                            continue;
-                        }
-                        Element deviceListElement = rootElement.element("DeviceList");
-                        Element sumNumElement = rootElement.element("SumNum");
-                        Element snElement = rootElement.element("SN");
-                        int sumNum = Integer.parseInt(sumNumElement.getText());
-
-                        if (sumNum == 0) {
-                            log.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId());
-                            // 数据已经完整接收
-                            deviceChannelService.cleanChannelsForDevice(take.getDevice().getId());
-                            catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
-                        } else {
-                            Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
-                            if (deviceListIterator != null) {
-                                List<DeviceChannel> channelList = new ArrayList<>();
-                                List<Region> regionList = new ArrayList<>();
-                                List<Group> groupList = new ArrayList<>();
-                                // 遍历DeviceList
-                                while (deviceListIterator.hasNext()) {
-                                    Element itemDevice = deviceListIterator.next();
-                                    Element channelDeviceElement = itemDevice.element("DeviceID");
-                                    if (channelDeviceElement == null) {
-                                        continue;
-                                    }
-                                    DeviceChannel channel = DeviceChannel.decode(itemDevice);
-                                    if (channel.getDeviceId() == null) {
-                                        log.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
-                                        continue;
-                                    }
-                                    channel.setDeviceDbId(device.getId());
-                                    if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
-                                        channel.setParentId(null);
-                                    }
-                                    // 解析通道类型
-                                    if (channel.getDeviceId().length() <= 8) {
-                                        // 行政区划
-                                        Region region = Region.getInstance(channel);
-                                        regionList.add(region);
-                                        channel.setChannelType(1);
-                                    }else if (channel.getDeviceId().length() == 20){
-                                        // 业务分组/虚拟组织
-                                        Group group = Group.getInstance(channel);
-                                        if (group != null) {
-                                            channel.setParental(1);
-                                            channel.setChannelType(2);
-                                            groupList.add(group);
-                                        }
-                                    }
-                                    channelList.add(channel);
-                                }
-                                int sn = Integer.parseInt(snElement.getText());
-                                catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(),
-                                        channelList, regionList, groupList);
-                                log.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()) == null ? 0 : catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size(), sumNum);
-                                if (catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size() == sumNum) {
-                                    // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理,
-                                    // 目前支持设备通道上线通知时和设备上线时向上级通知
-                                    boolean resetChannelsResult = saveData(device);
-                                    if (!resetChannelsResult) {
-                                        String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size() + "条";
-                                        catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg);
-                                    } else {
-                                        catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
-                                    }
+    }
+
+    @Scheduled(fixedDelay = 200)
+    public void executeTaskQueue(){
+        if (taskQueue.isEmpty()) {
+            return;
+        }
+        List<HandlerCatchData> handlerCatchDataList = new ArrayList<>();
+        int size = taskQueue.size();
+        for (int i = 0; i < size; i++) {
+            HandlerCatchData poll = taskQueue.poll();
+            if (poll != null) {
+                handlerCatchDataList.add(poll);
+            }
+        }
+        if (handlerCatchDataList.isEmpty()) {
+            return;
+        }
+        for (HandlerCatchData take : handlerCatchDataList) {
+            if (take == null) {
+                continue;
+            }
+            RequestEvent evt = take.getEvt();
+            // 全局异常捕获,保证下一条可以得到处理
+            try {
+                Element rootElement = null;
+                try {
+                    rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset());
+                } catch (DocumentException e) {
+                    log.error("[xml解析] 失败: ", e);
+                    continue;
+                }
+                if (rootElement == null) {
+                    log.warn("[ 收到通道 ] content cannot be null, {}", evt.getRequest());
+                    continue;
+                }
+                Element deviceListElement = rootElement.element("DeviceList");
+                Element sumNumElement = rootElement.element("SumNum");
+                Element snElement = rootElement.element("SN");
+                int sumNum = Integer.parseInt(sumNumElement.getText());
+
+                if (sumNum == 0) {
+                    log.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId());
+                    // 数据已经完整接收
+                    deviceChannelService.cleanChannelsForDevice(take.getDevice().getId());
+                    catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
+                } else {
+                    Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
+                    if (deviceListIterator != null) {
+                        List<DeviceChannel> channelList = new ArrayList<>();
+                        List<Region> regionList = new ArrayList<>();
+                        List<Group> groupList = new ArrayList<>();
+                        // 遍历DeviceList
+                        while (deviceListIterator.hasNext()) {
+                            Element itemDevice = deviceListIterator.next();
+                            Element channelDeviceElement = itemDevice.element("DeviceID");
+                            if (channelDeviceElement == null) {
+                                continue;
+                            }
+                            DeviceChannel channel = DeviceChannel.decode(itemDevice);
+                            if (channel.getDeviceId() == null) {
+                                log.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
+                                continue;
+                            }
+                            channel.setDeviceDbId(take.getDevice().getId());
+                            if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
+                                channel.setParentId(null);
+                            }
+                            // 解析通道类型
+                            if (channel.getDeviceId().length() <= 8) {
+                                // 行政区划
+                                Region region = Region.getInstance(channel);
+                                regionList.add(region);
+                                channel.setChannelType(1);
+                            }else if (channel.getDeviceId().length() == 20){
+                                // 业务分组/虚拟组织
+                                Group group = Group.getInstance(channel);
+                                if (group != null) {
+                                    channel.setParental(1);
+                                    channel.setChannelType(2);
+                                    groupList.add(group);
                                 }
                             }
-
+                            channelList.add(channel);
+                        }
+                        int sn = Integer.parseInt(snElement.getText());
+                        catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(),
+                                channelList, regionList, groupList);
+                        log.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()) == null ? 0 : catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size(), sumNum);
+                        if (catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size() == sumNum) {
+                            // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理,
+                            // 目前支持设备通道上线通知时和设备上线时向上级通知
+                            boolean resetChannelsResult = saveData(take.getDevice());
+                            if (!resetChannelsResult) {
+                                String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size() + "条";
+                                catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg);
+                            } else {
+                                catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
+                            }
                         }
-                    } catch (Exception e) {
-                        log.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest());
-                        log.error("[收到通道] 异常内容: ", e);
                     }
+
                 }
-                processing.set(false);
-            });
+            } catch (Exception e) {
+                log.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest());
+                log.error("[收到通道] 异常内容: ", e);
+            }
         }
-
     }
 
     @Transactional

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java

@@ -99,7 +99,7 @@ public class MobilePositionServiceImpl implements IMobilePositionService {
         channelMapper.updateStreamGPS(gpsMsgInfoList);
     }
 
-    @Scheduled(fixedRate = 1000)
+    @Scheduled(fixedDelay = 1000)
     @Transactional
     public void executeTaskQueue() {
         int countLimit = 3000;

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java

@@ -65,7 +65,7 @@ public class RedisGpsMsgListener implements MessageListener {
     /**
      * 定时将经纬度更新到数据库
      */
-    @Scheduled(fixedRate = 2 * 1000)   //每2秒执行一次
+    @Scheduled(fixedDelay = 2 * 1000)   //每2秒执行一次
     public void execute(){
         List<GPSMsgInfo> gpsMsgInfoList = redisCatchStorage.getAllGpsMsgInfo();
         if (!gpsMsgInfoList.isEmpty()) {