Kaynağa Gözat

合并271 notify消息优化

648540858 1 yıl önce
ebeveyn
işleme
249bdf69be

+ 2 - 10
src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java

@@ -688,18 +688,10 @@ public interface DeviceChannelMapper {
 
     @Update({"<script>" +
             "<foreach collection='channels' item='item' separator=';'>" +
-            "UPDATE wvp_device_channel SET status='ON' WHERE device_id=#{item.deviceId}" +
+            "UPDATE wvp_device_channel SET status=#{item.status} WHERE device_id=#{item.deviceId}" +
             "</foreach>" +
             "</script>"})
-    int batchOnlineForNotify(List<DeviceChannel> channels);
-
-    @Update({"<script>" +
-            "<foreach collection='channels' item='item' separator=';'>" +
-            "UPDATE wvp_device_channel SET status='OFF' WHERE device_id=#{item.deviceId}" +
-            "</foreach>" +
-            "</script>"})
-    int batchOfflineForNotify(List<DeviceChannel> channels);
-
+    int batchUpdateStatus(List<DeviceChannel> channels);
 
     @Select("select count(1) from wvp_device_channel where status = 'ON'")
     int getOnlineCount();

+ 2 - 10
src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceChannelService.java

@@ -3,8 +3,8 @@ package com.genersoft.iot.vmp.gb28181.service;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
 import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
-import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
 import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce;
+import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
 import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend;
 import com.github.pagehelper.PageInfo;
 
@@ -47,15 +47,7 @@ public interface IDeviceChannelService {
      */
     int deleteChannelsForNotify(List<DeviceChannel> deleteChannelList);
 
-    /**
-     * 批量上线
-     */
-    int channelsOnlineForNotify(List<DeviceChannel> channels);
-
-    /**
-     * 批量下线
-     */
-    int channelsOfflineForNotify(List<DeviceChannel> channels);
+    int updateChannelsStatus(List<DeviceChannel> channels);
 
     /**
      *  获取一个通道

+ 57 - 11
src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java

@@ -193,13 +193,45 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
     }
 
     @Override
-    public int deleteChannelsForNotify(List<DeviceChannel> deleteChannelList) {
-       return channelMapper.batchDelForNotify(deleteChannelList);
+    @Transactional
+    public int deleteChannelsForNotify(List<DeviceChannel> channels) {
+        int limitCount = 1000;
+        int result = 0;
+        if (!channels.isEmpty()) {
+            if (channels.size() > limitCount) {
+                for (int i = 0; i < channels.size(); i += limitCount) {
+                    int toIndex = i + limitCount;
+                    if (i + limitCount > channels.size()) {
+                        toIndex = channels.size();
+                    }
+                    result += channelMapper.batchDel(channels.subList(i, toIndex));
+                }
+            }else {
+                result += channelMapper.batchDel(channels);
+            }
+        }
+        return result;
     }
 
+    @Transactional
     @Override
-    public int channelsOnlineForNotify(List<DeviceChannel> channels) {
-        return channelMapper.batchOnlineForNotify(channels);
+    public int updateChannelsStatus(List<DeviceChannel> channels) {
+        int limitCount = 1000;
+        int result = 0;
+        if (!channels.isEmpty()) {
+            if (channels.size() > limitCount) {
+                for (int i = 0; i < channels.size(); i += limitCount) {
+                    int toIndex = i + limitCount;
+                    if (i + limitCount > channels.size()) {
+                        toIndex = channels.size();
+                    }
+                    result += channelMapper.batchUpdateStatus(channels.subList(i, toIndex));
+                }
+            }else {
+                result += channelMapper.batchUpdateStatus(channels);
+            }
+        }
+        return result;
     }
 
     @Override
@@ -207,12 +239,6 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
         channelMapper.online(channel.getId());
     }
 
-    @Override
-    public int channelsOfflineForNotify(List<DeviceChannel> channels) {
-        return channelMapper.batchOfflineForNotify(channels);
-    }
-
-
     @Override
     public void offline(DeviceChannel channel) {
         channelMapper.offline(channel.getId());
@@ -242,6 +268,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
     }
 
     @Override
+    @Transactional
     public synchronized void batchUpdateChannelForNotify(List<DeviceChannel> channels) {
         String now = DateUtil.getNow();
         for (DeviceChannel channel : channels) {
@@ -264,8 +291,27 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
     }
 
     @Override
+    @Transactional
     public void batchAddChannel(List<DeviceChannel> channels) {
-        channelMapper.batchAdd(channels);
+        String now = DateUtil.getNow();
+        for (DeviceChannel channel : channels) {
+            channel.setUpdateTime(now);
+            channel.setCreateTime(now);
+        }
+        int limitCount = 1000;
+        if (!channels.isEmpty()) {
+            if (channels.size() > limitCount) {
+                for (int i = 0; i < channels.size(); i += limitCount) {
+                    int toIndex = i + limitCount;
+                    if (i + limitCount > channels.size()) {
+                        toIndex = channels.size();
+                    }
+                    channelMapper.batchAdd(channels.subList(i, toIndex));
+                }
+            }else {
+                channelMapper.batchAdd(channels);
+            }
+        }
         for (DeviceChannel channel : channels) {
             if (channel.getParentId() != null) {
                 channelMapper.updateChannelSubCount(channel.getDeviceDbId(), channel.getParentId());

+ 33 - 32
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java

@@ -39,8 +39,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 @Component
 public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent {
 
-	private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>();
-	private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>();
+	private final List<DeviceChannel> updateChannelForStatusChange = new CopyOnWriteArrayList<>();
 	private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
 
 	private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>();
@@ -60,6 +59,10 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 	@Autowired
 	private IDeviceChannelService deviceChannelService;
 
+//	@Scheduled(fixedRate = 2000)   //每400毫秒执行一次
+//	public void showSize(){
+//		log.warn("[notify-目录订阅] 待处理消息数量: {}", taskQueue.size() );
+//	}
 
 	@Transactional
 	public void process(RequestEvent evt) {
@@ -75,7 +78,14 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 		if (taskQueue.isEmpty()) {
 			return;
 		}
-		for (HandlerCatchData take : taskQueue) {
+		List<HandlerCatchData> handlerCatchDataList = new ArrayList<>();
+		while (!taskQueue.isEmpty()) {
+			handlerCatchDataList.add(taskQueue.poll());
+		}
+		if (handlerCatchDataList.isEmpty()) {
+			return;
+		}
+		for (HandlerCatchData take : handlerCatchDataList) {
 			if (take == null) {
 				continue;
 			}
@@ -119,14 +129,17 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
                             log.error("[解析CatalogChannelEvent]失败原文: \n{}", new String(evt.getRequest().getRawContent(), Charset.forName(device.getCharset())));
 							continue;
                         }
-
-                        log.info("[收到目录订阅]:{}/{}-{}", device.getDeviceId(),
-								catalogChannelEvent.getChannel().getDeviceId(), catalogChannelEvent.getEvent());
+						if (log.isDebugEnabled()){
+							log.debug("[收到目录订阅]:{}/{}-{}", device.getDeviceId(),
+									catalogChannelEvent.getChannel().getDeviceId(), catalogChannelEvent.getEvent());
+						}
+						DeviceChannel channel = catalogChannelEvent.getChannel();
 						switch (catalogChannelEvent.getEvent()) {
 							case CatalogEvent.ON:
 								// 上线
 								log.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
-								updateChannelOnlineList.add(catalogChannelEvent.getChannel());
+								channel.setStatus("ON");
+								updateChannelForStatusChange.add(channel);
 								if (userSetting.getDeviceStatusNotify()) {
 									// 发送redis消息
 									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), true);
@@ -138,7 +151,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 								if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
 									log.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
 								} else {
-									updateChannelOfflineList.add(catalogChannelEvent.getChannel());
+									channel.setStatus("OFF");
+									updateChannelForStatusChange.add(channel);
 									if (userSetting.getDeviceStatusNotify()) {
 										// 发送redis消息
 										redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false);
@@ -151,7 +165,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 								if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
 									log.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
 								} else {
-									updateChannelOfflineList.add(catalogChannelEvent.getChannel());
+									channel.setStatus("OFF");
+									updateChannelForStatusChange.add(channel);
 									if (userSetting.getDeviceStatusNotify()) {
 										// 发送redis消息
 										redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false);
@@ -164,7 +179,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 								if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
 									log.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
 								} else {
-									updateChannelOfflineList.add(catalogChannelEvent.getChannel());
+									channel.setStatus("OFF");
+									updateChannelForStatusChange.add(channel);
 									if (userSetting.getDeviceStatusNotify()) {
 										// 发送redis消息
 										redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false);
@@ -178,7 +194,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 								DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, catalogChannelEvent.getChannel().getDeviceId());
 								if (deviceChannel != null) {
 									log.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
-									DeviceChannel channel = catalogChannelEvent.getChannel();
 									channel.setId(deviceChannel.getId());
 									channel.setHasAudio(deviceChannel.isHasAudio());
 									channel.setUpdateTime(DateUtil.getNow());
@@ -210,7 +225,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 								// 判断此通道是否存在
 								DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, catalogChannelEvent.getChannel().getDeviceId());
 								if (deviceChannelForUpdate != null) {
-									DeviceChannel channel = catalogChannelEvent.getChannel();
 									channel.setId(deviceChannelForUpdate.getId());
 									channel.setHasAudio(deviceChannelForUpdate.isHasAudio());
 									channel.setUpdateTime(DateUtil.getNow());
@@ -242,8 +256,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 		taskQueue.clear();
 		if (!updateChannelMap.keySet().isEmpty()
 				|| !addChannelMap.keySet().isEmpty()
-				|| !updateChannelOnlineList.isEmpty()
-				|| !updateChannelOfflineList.isEmpty()
+				|| !updateChannelForStatusChange.isEmpty()
 				|| !deleteChannelList.isEmpty()) {
 			executeSave();
 		}
@@ -256,14 +269,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 			log.error("[存储收到的增加通道] 异常: ", e );
 		}
 		try {
-			executeSaveForOnline();
+			executeSaveForStatus();
 		} catch (Exception e) {
-			log.error("[存储收到的通道上线] 异常: ", e );
-		}
-		try {
-			executeSaveForOffline();
-		} catch (Exception e) {
-			log.error("[存储收到的通道离线] 异常: ", e );
+			log.error("[存储收到的通道状态变化] 异常: ", e );
 		}
 		try {
 			executeSaveForUpdate();
@@ -301,17 +309,10 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 		}
 	}
 
-	private void executeSaveForOnline(){
-		if (!updateChannelOnlineList.isEmpty()) {
-			deviceChannelService.channelsOnlineForNotify(updateChannelOnlineList);
-			updateChannelOnlineList.clear();
-		}
-	}
-
-	private void executeSaveForOffline(){
-		if (!updateChannelOfflineList.isEmpty()) {
-			deviceChannelService.channelsOfflineForNotify(updateChannelOfflineList);
-			updateChannelOfflineList.clear();
+	private void executeSaveForStatus(){
+		if (!updateChannelForStatusChange.isEmpty()) {
+			deviceChannelService.updateChannelsStatus(updateChannelForStatusChange);
+			updateChannelForStatusChange.clear();
 		}
 	}
 }

+ 9 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java

@@ -24,6 +24,7 @@ import org.springframework.util.ObjectUtils;
 
 import javax.sip.RequestEvent;
 import javax.sip.header.FromHeader;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -65,7 +66,14 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
 		if (taskQueue.isEmpty()) {
 			return;
 		}
-		for (HandlerCatchData take : taskQueue) {
+		List<HandlerCatchData> handlerCatchDataList = new ArrayList<>();
+		while (!taskQueue.isEmpty()) {
+			handlerCatchDataList.add(taskQueue.poll());
+		}
+		if (handlerCatchDataList.isEmpty()) {
+			return;
+		}
+		for (HandlerCatchData take : handlerCatchDataList) {
 			if (take == null) {
 				continue;
 			}