소스 검색

支持设备/通道状态变化时发送redis通知

648540858 2 년 전
부모
커밋
0f50904992

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java

@@ -122,6 +122,7 @@ public class VideoManagerConstants {
 	 */
 	public static final String VM_MSG_SUBSCRIBE_ALARM = "alarm";
 
+
 	/**
 	 * 报警通知的发送 (收到redis发出的通知,转发给其他平台)
 	 */

+ 10 - 0
src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java

@@ -52,6 +52,8 @@ public class UserSetting {
 
     private Boolean refuseChannelStatusChannelFormNotify = Boolean.FALSE;
 
+    private Boolean deviceStatusNotify = Boolean.FALSE;
+
     private String serverId = "000000";
 
     private String recordPath = null;
@@ -267,4 +269,12 @@ public class UserSetting {
     public void setMaxNotifyCountQueue(int maxNotifyCountQueue) {
         this.maxNotifyCountQueue = maxNotifyCountQueue;
     }
+
+    public Boolean getDeviceStatusNotify() {
+        return deviceStatusNotify;
+    }
+
+    public void setDeviceStatusNotify(Boolean deviceStatusNotify) {
+        this.deviceStatusNotify = deviceStatusNotify;
+    }
 }

+ 17 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java

@@ -108,6 +108,11 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 							if (updateChannelOnlineList.size() > 300) {
 								executeSaveForOnline();
 							}
+							if (userSetting.getDeviceStatusNotify()) {
+								// 发送redis消息
+								redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
+							}
+
 							break;
 						case CatalogEvent.OFF :
 							// 离线
@@ -117,6 +122,10 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 								if (updateChannelOfflineList.size() > 300) {
 									executeSaveForOffline();
 								}
+								if (userSetting.getDeviceStatusNotify()) {
+									// 发送redis消息
+									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
+								}
 							}else {
 								logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
 							}
@@ -129,6 +138,10 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 								if (updateChannelOfflineList.size() > 300) {
 									executeSaveForOffline();
 								}
+								if (userSetting.getDeviceStatusNotify()) {
+									// 发送redis消息
+									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
+								}
 							}else {
 								logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
 							}
@@ -141,6 +154,10 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 								if (updateChannelOfflineList.size() > 300) {
 									executeSaveForOffline();
 								}
+								if (userSetting.getDeviceStatusNotify()) {
+									// 发送redis消息
+									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
+								}
 							}else {
 								logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
 							}

+ 10 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java

@@ -165,6 +165,11 @@ public class DeviceServiceImpl implements IDeviceService {
         String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId();
         // 如果第一次注册那么必须在60 * 3时间内收到一个心跳,否则设备离线
         dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId(), "首次注册后未能收到心跳"), device.getKeepaliveIntervalTime() * 1000 * 3);
+        if (userSetting.getDeviceStatusNotify()) {
+            // 发送redis消息
+            redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, true);
+        }
+
     }
 
     @Override
@@ -193,6 +198,11 @@ public class DeviceServiceImpl implements IDeviceService {
         // 移除订阅
         removeCatalogSubscribe(device);
         removeMobilePositionSubscribe(device);
+        if (userSetting.getDeviceStatusNotify()) {
+            // 发送redis消息
+            redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false);
+        }
+
     }
 
     @Override

+ 2 - 0
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java

@@ -261,4 +261,6 @@ public interface IRedisCatchStorage {
     List<Device> getAllDevices();
 
     void removeAllDevice();
+
+    void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online);
 }

+ 14 - 0
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -902,4 +902,18 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
                 + userSetting.getServerId() + "_*_" + id + "_*";
         return RedisUtil.scan(redisTemplate, key).size();
     }
+
+    @Override
+    public void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online) {
+        String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_DEVICE_STATUS;
+        logger.info("[redis通知] 推送设备/通道状态, {}/{}-{}", deviceId, channelId, online);
+        StringBuilder msg = new StringBuilder();
+        msg.append(deviceId);
+        if (channelId != null) {
+            msg.append(":").append(channelId);
+        }
+        msg.append(" ").append(online? "ON":"OFF");
+
+        redisTemplate.convertAndSend(key, msg.toString());
+    }
 }

+ 2 - 0
src/main/resources/all-application.yml

@@ -180,6 +180,8 @@ user-settings:
     refuse-channel-status-channel-form-notify: false
     # 设置notify缓存队列最大长度,超过此长度的数据将返回486 BUSY_HERE,消息丢弃, 默认10000
     max-notify-count-queue: 10000
+    # 设备/通道状态变化时发送消息
+    device-status-notify: false
     # 跨域配置,配置你访问前端页面的地址即可, 可以配置多个
     allowed-origins:
         - http://localhost:8008