648540858 1 год назад
Родитель
Сommit
b57dbeac13

+ 7 - 7
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java

@@ -15,7 +15,6 @@ import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.data.redis.connection.Message;
 import org.springframework.data.redis.connection.MessageListener;
 import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
@@ -216,10 +215,11 @@ public class RedisRpcConfig implements MessageListener {
         return callbacks.size();
     }
 
-    @Scheduled(fixedRate = 1000)   //每1秒执行一次
-    public void execute(){
-        System.out.println("callbacks的长度: " + callbacks.size());
-        System.out.println("队列的长度: " + topicSubscribers.size());
-        System.out.println("HOOK监听的长度: " + hookSubscribe.size());
-    }
+//    @Scheduled(fixedRate = 1000)   //每1秒执行一次
+//    public void execute(){
+//        logger.info("callbacks的长度: " + callbacks.size());
+//        logger.info("队列的长度: " + topicSubscribers.size());
+//        logger.info("HOOK监听的长度: " + hookSubscribe.size());
+//        logger.info("");
+//    }
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java

@@ -238,7 +238,7 @@ public class DeviceChannel {
 	 *  是否含有音频
 	 */
 	@Schema(description = "是否含有音频")
-	private boolean hasAudio;
+	private Boolean hasAudio;
 
 	/**
 	 * 标记通道的类型,0->国标通道 1->直播流通道 2->业务分组/虚拟组织/行政区划

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -831,6 +831,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
             }
         });
         // 添加回复的拒绝或者错误的通知
+        // redis消息例如: PUBLISH VM_MSG_STREAM_PUSH_RESPONSE  '{"code":1,"msg":"失败","app":"1","stream":"2"}'
         redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
             if (response.getCode() != 0) {
                 dynamicTask.stop(sendRtpItem.getCallId());

+ 2 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java

@@ -295,9 +295,10 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 
 	private void executeSaveForUpdate(){
 		if (!updateChannelMap.values().isEmpty()) {
+			logger.info("[存储收到的更新通道], 数量: {}", updateChannelMap.size());
 			ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
-			updateChannelMap.clear();
 			deviceChannelService.batchUpdateChannel(deviceChannels);
+			updateChannelMap.clear();
 		}
 
 	}

+ 214 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java

@@ -0,0 +1,214 @@
+package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
+
+import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
+import com.genersoft.iot.vmp.conf.DynamicTask;
+import com.genersoft.iot.vmp.conf.SipConfig;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.Device;
+import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
+import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
+import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
+import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
+import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
+import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
+import com.genersoft.iot.vmp.service.IDeviceChannelService;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import org.dom4j.DocumentException;
+import org.dom4j.Element;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import javax.sip.RequestEvent;
+import javax.sip.header.FromHeader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * SIP命令类型: NOTIFY请求中的移动位置请求处理
+ */
+@Component
+public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessorParent {
+
+
+    private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class);
+
+	private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
+
+	private final List<MobilePosition> addMobilePositionList = new CopyOnWriteArrayList();
+
+
+	@Autowired
+	private UserSetting userSetting;
+
+	@Autowired
+	private EventPublisher eventPublisher;
+
+	@Autowired
+	private IRedisCatchStorage redisCatchStorage;
+
+	@Autowired
+	private IDeviceChannelService deviceChannelService;
+
+	@Autowired
+	private DynamicTask dynamicTask;
+
+	@Autowired
+	private CivilCodeFileConf civilCodeFileConf;
+
+	@Autowired
+	private SipConfig sipConfig;
+
+	private final static String talkKey = "notify-request-for-mobile-position-task";
+
+	public void process(RequestEvent evt) {
+		try {
+			FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
+			String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
+
+			// 回复 200 OK
+			Element rootElement = getRootElement(evt);
+			if (rootElement == null) {
+				logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
+				return;
+			}
+
+			MobilePosition mobilePosition = new MobilePosition();
+			mobilePosition.setCreateTime(DateUtil.getNow());
+
+			Element deviceIdElement = rootElement.element("DeviceID");
+			String channelId = deviceIdElement.getTextTrim().toString();
+			Device device = redisCatchStorage.getDevice(deviceId);
+
+			if (device == null) {
+				device = redisCatchStorage.getDevice(channelId);
+				if (device == null) {
+					// 根据通道id查询设备Id
+					List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
+					if (deviceList.size() > 0) {
+						device = deviceList.get(0);
+					}
+				}
+			}
+			if (device == null) {
+				logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
+				return;
+			}
+			// 兼容设备部分设备上报是通道编号与设备编号一致的情况
+			if (deviceId.equals(channelId)) {
+				List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
+				if (deviceChannels.size() == 1) {
+					channelId = deviceChannels.get(0).getChannelId();
+				}
+			}
+			if (!ObjectUtils.isEmpty(device.getName())) {
+				mobilePosition.setDeviceName(device.getName());
+			}
+
+			mobilePosition.setDeviceId(device.getDeviceId());
+			mobilePosition.setChannelId(channelId);
+			String time = XmlUtil.getText(rootElement, "Time");
+			if (ObjectUtils.isEmpty(time)) {
+				mobilePosition.setTime(DateUtil.getNow());
+			} else {
+				mobilePosition.setTime(SipUtils.parseTime(time));
+			}
+
+			mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
+			mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
+			if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) {
+				mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed")));
+			} else {
+				mobilePosition.setSpeed(0.0);
+			}
+			if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) {
+				mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction")));
+			} else {
+				mobilePosition.setDirection(0.0);
+			}
+			if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) {
+				mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude")));
+			} else {
+				mobilePosition.setAltitude(0.0);
+			}
+			logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
+					mobilePosition.getLongitude(), mobilePosition.getLatitude());
+			mobilePosition.setReportSource("Mobile Position");
+
+			// 更新device channel 的经纬度
+			DeviceChannel deviceChannel = new DeviceChannel();
+			deviceChannel.setDeviceId(device.getDeviceId());
+			deviceChannel.setChannelId(channelId);
+			deviceChannel.setLongitude(mobilePosition.getLongitude());
+			deviceChannel.setLatitude(mobilePosition.getLatitude());
+			deviceChannel.setGpsTime(mobilePosition.getTime());
+			updateChannelMap.put(channelId, deviceChannel);
+			addMobilePositionList.add(mobilePosition);
+			if(updateChannelMap.size() > 300) {
+				executeSaveChannel();
+			}
+			if (userSetting.isSavePositionHistory()) {
+				if(addMobilePositionList.size() > 300) {
+					executeSaveMobilePosition();
+				}
+			}
+
+//			deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
+//
+//			mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
+//			mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
+//			mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
+//			mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
+
+//			deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
+
+			if (!dynamicTask.contains(talkKey)) {
+				dynamicTask.startDelay(talkKey, this::executeSave, 1000);
+			}
+
+		} catch (DocumentException e) {
+			logger.error("未处理的异常 ", e);
+		}
+
+
+	}
+
+	private void executeSave(){
+		executeSaveChannel();
+		executeSaveMobilePosition();
+		dynamicTask.stop(talkKey);
+	}
+
+	private void executeSaveChannel(){
+		try {
+			logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size());
+			ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
+			deviceChannelService.batchUpdateChannelGPS(deviceChannels);
+			updateChannelMap.clear();
+		}catch (Exception e) {
+
+		}
+	}
+
+	private void executeSaveMobilePosition(){
+		if (userSetting.isSavePositionHistory()) {
+			try {
+				logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
+				deviceChannelService.batchAddMobilePosition(addMobilePositionList);
+				addMobilePositionList.clear();
+			}catch (Exception e) {
+				logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size());
+			}
+		}
+	}
+
+
+
+}

+ 6 - 6
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java

@@ -228,12 +228,12 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 			deviceChannel.setLongitude(mobilePosition.getLongitude());
 			deviceChannel.setLatitude(mobilePosition.getLatitude());
 			deviceChannel.setGpsTime(mobilePosition.getTime());
-			deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
-
-			mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
-			mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
-			mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
-			mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
+//			deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
+//
+//			mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
+//			mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
+//			mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
+//			mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
 
 			deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
 

+ 4 - 3
src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java

@@ -532,16 +532,17 @@ public class XmlUtil {
                     String status = getText(itemDevice, "Status");
                     if (status != null) {
                         // ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR的兼容性处理
-                        if (status.equals("ON") || status.equals("On") || status.equals("ONLINE") || status.equals("OK")) {
+                        if (status.equalsIgnoreCase("ON") || status.equalsIgnoreCase("On") || status.equalsIgnoreCase("ONLINE") || status.equalsIgnoreCase("OK")) {
                             deviceChannel.setStatus(true);
                         }
-                        if (status.equals("OFF") || status.equals("Off") || status.equals("OFFLINE")) {
+                        if (status.equalsIgnoreCase("OFF") || status.equalsIgnoreCase("Off") || status.equalsIgnoreCase("OFFLINE")) {
                             deviceChannel.setStatus(false);
                         }
                     }else {
                         deviceChannel.setStatus(true);
                     }
-
+//                    logger.info("状态字符串: {}", status);
+//                    logger.info("状态结果: {}", deviceChannel.isStatus());
                     // 经度
                     String longitude = getText(itemDevice, "Longitude");
                     if (NumericUtil.isDouble(longitude)) {

+ 4 - 0
src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java

@@ -98,4 +98,8 @@ public interface IDeviceChannelService {
 
     void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition);
 
+    void batchUpdateChannelGPS(List<DeviceChannel> channelList);
+
+    void batchAddMobilePosition(List<MobilePosition> addMobilePositionList);
+
 }

+ 10 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java

@@ -353,4 +353,14 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
             redisCatchStorage.sendMobilePositionMsg(jsonObject);
         }
     }
+
+    @Override
+    public void batchUpdateChannelGPS(List<DeviceChannel> channelList) {
+
+    }
+
+    @Override
+    public void batchAddMobilePosition(List<MobilePosition> mobilePositions) {
+
+    }
 }

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java

@@ -565,6 +565,7 @@ public class DeviceServiceImpl implements IDeviceService {
                     removeMobilePositionSubscribe(deviceInStore, result->{
                         // 开启订阅
                         deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
+                        deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
                         addMobilePositionSubscribe(deviceInStore);
                         // 因为是异步执行,需要在这里更新下数据
                         deviceMapper.updateCustom(deviceInStore);
@@ -573,12 +574,14 @@ public class DeviceServiceImpl implements IDeviceService {
                 }else {
                     // 开启订阅
                     deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
+                    deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
                     addMobilePositionSubscribe(deviceInStore);
                 }
 
             }else if (device.getSubscribeCycleForMobilePosition() == 0) {
                 // 取消订阅
                 deviceInStore.setSubscribeCycleForMobilePosition(0);
+                deviceInStore.setMobilePositionSubmissionInterval(0);
                 removeMobilePositionSubscribe(deviceInStore, null);
             }
         }

+ 1 - 1
src/main/resources/application.yml

@@ -2,4 +2,4 @@ spring:
   application:
     name: wvp
   profiles:
-    active: local2
+    active: local