Bladeren bron

优化notify消息中目录和移动位置的处理

648540858 1 jaar geleden
bovenliggende
commit
66a681d679

+ 86 - 254
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java

@@ -1,7 +1,5 @@
 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 
-import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
-import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
@@ -20,10 +18,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
 
 import javax.sip.RequestEvent;
 import javax.sip.header.FromHeader;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -39,8 +37,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 
     private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class);
 
-	private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>();
-	private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>();
 	private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
 
 	private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>();
@@ -59,275 +55,111 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
 	@Autowired
 	private IDeviceChannelService deviceChannelService;
 
-	@Autowired
-	private DynamicTask dynamicTask;
-
-	@Autowired
-	private CivilCodeFileConf civilCodeFileConf;
-
 	@Autowired
 	private SipConfig sipConfig;
 
-	private final static String talkKey = "notify-request-for-catalog-task";
-
-	public void process(RequestEvent evt) {
-		try {
-			long start = System.currentTimeMillis();
-			FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
-			String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
-
-			Device device = redisCatchStorage.getDevice(deviceId);
-			if (device == null || !device.isOnLine()) {
-				logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" ));
-				return;
-			}
-			Element rootElement = getRootElement(evt, device.getCharset());
-			if (rootElement == null) {
-				logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest());
-				return;
-			}
-			Element deviceListElement = rootElement.element("DeviceList");
-			if (deviceListElement == null) {
-				return;
-			}
-			Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
-			if (deviceListIterator != null) {
-
-				// 遍历DeviceList
-				while (deviceListIterator.hasNext()) {
-					Element itemDevice = deviceListIterator.next();
-					Element eventElement = itemDevice.element("Event");
-					String event;
-					if (eventElement == null) {
-						logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" ));
-						event = CatalogEvent.ADD;
-					}else {
-						event = eventElement.getText().toUpperCase();
-					}
-					DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
-					if (channel == null) {
-						logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
-						continue;
-					}
-					if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
-						channel.setParentId(null);
-					}
-					channel.setDeviceId(device.getDeviceId());
-					logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
-					switch (event) {
-						case CatalogEvent.ON:
-							// 上线
-							logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							updateChannelOnlineList.add(channel);
-							if (updateChannelOnlineList.size() > 300) {
-								executeSaveForOnline();
-							}
-							if (userSetting.getDeviceStatusNotify()) {
-								// 发送redis消息
-								redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
-							}
-
-							break;
-						case CatalogEvent.OFF :
-							// 离线
-							logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
-								logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							}else {
-								updateChannelOfflineList.add(channel);
-								if (updateChannelOfflineList.size() > 300) {
-									executeSaveForOffline();
-								}
-								if (userSetting.getDeviceStatusNotify()) {
-									// 发送redis消息
-									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
-								}
-							}
-							break;
-						case CatalogEvent.VLOST:
-							// 视频丢失
-							logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
-								logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							}else {
-								updateChannelOfflineList.add(channel);
-								if (updateChannelOfflineList.size() > 300) {
-									executeSaveForOffline();
-								}
-								if (userSetting.getDeviceStatusNotify()) {
-									// 发送redis消息
-									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
-								}
-							}
-							break;
-						case CatalogEvent.DEFECT:
-							// 故障
-							logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
-								logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							}else {
-								updateChannelOfflineList.add(channel);
-								if (updateChannelOfflineList.size() > 300) {
-									executeSaveForOffline();
-								}
+	@Transactional
+	public void process(List<RequestEvent> evtList) {
+		if (evtList.isEmpty()) {
+			return;
+		}
+		for (RequestEvent evt : evtList) {
+			try {
+				long start = System.currentTimeMillis();
+				FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
+				String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
+
+				Device device = redisCatchStorage.getDevice(deviceId);
+				if (device == null || !device.isOnLine()) {
+					logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" ));
+					return;
+				}
+				Element rootElement = getRootElement(evt, device.getCharset());
+				if (rootElement == null) {
+					logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest());
+					return;
+				}
+				Element deviceListElement = rootElement.element("DeviceList");
+				if (deviceListElement == null) {
+					return;
+				}
+				Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
+				if (deviceListIterator != null) {
+
+					// 遍历DeviceList
+					while (deviceListIterator.hasNext()) {
+						Element itemDevice = deviceListIterator.next();
+						Element eventElement = itemDevice.element("Event");
+						String event;
+						if (eventElement == null) {
+							logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" ));
+							event = CatalogEvent.ADD;
+						}else {
+							event = eventElement.getText().toUpperCase();
+						}
+						DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
+						if (channel == null) {
+							logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
+							continue;
+						}
+						if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
+							channel.setParentId(null);
+						}
+						channel.setDeviceId(device.getDeviceId());
+						logger.info("[收到目录订阅]:{}, {}/{}",event, device.getDeviceId(), channel.getChannelId());
+						switch (event) {
+							case CatalogEvent.ON:
+								// 上线
+								deviceChannelService.online(channel);
 								if (userSetting.getDeviceStatusNotify()) {
 									// 发送redis消息
-									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
+									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
 								}
-							}
-							break;
-						case CatalogEvent.ADD:
-							// 增加
-							logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							// 判断此通道是否存在
-							DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
-							if (deviceChannel != null) {
-								logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-								channel.setId(deviceChannel.getId());
-								updateChannelMap.put(channel.getChannelId(), channel);
-								if (updateChannelMap.keySet().size() > 300) {
-									executeSaveForUpdate();
+
+								break;
+							case CatalogEvent.OFF :
+							case CatalogEvent.VLOST:
+							case CatalogEvent.DEFECT:
+								// 离线
+								if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
+									logger.info("[目录订阅] 离线 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
+								}else {
+									deviceChannelService.offline(channel);
+									if (userSetting.getDeviceStatusNotify()) {
+										// 发送redis消息
+										redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
+									}
 								}
-							}else {
-								addChannelMap.put(channel.getChannelId(), channel);
+								break;
+							case CatalogEvent.DEL:
+								// 删除
+								deviceChannelService.delete(channel);
 								if (userSetting.getDeviceStatusNotify()) {
 									// 发送redis消息
-									redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
+									redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
 								}
-
-								if (addChannelMap.keySet().size() > 300) {
-									executeSaveForAdd();
-								}
-							}
-
-							break;
-						case CatalogEvent.DEL:
-							// 删除
-							logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							deleteChannelList.add(channel);
-							if (userSetting.getDeviceStatusNotify()) {
-								// 发送redis消息
-								redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
-							}
-							if (deleteChannelList.size() > 300) {
-								executeSaveForDelete();
-							}
-							break;
-						case CatalogEvent.UPDATE:
-							// 更新
-							logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							// 判断此通道是否存在
-							DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
-							if (deviceChannelForUpdate != null) {
-								channel.setId(deviceChannelForUpdate.getId());
+								break;
+							case CatalogEvent.ADD:
+							case CatalogEvent.UPDATE:
+								// 更新
 								channel.setUpdateTime(DateUtil.getNow());
-								updateChannelMap.put(channel.getChannelId(), channel);
-								if (updateChannelMap.keySet().size() > 300) {
-									executeSaveForUpdate();
-								}
-							}else {
-								addChannelMap.put(channel.getChannelId(), channel);
-								if (addChannelMap.keySet().size() > 300) {
-									executeSaveForAdd();
-								}
+								deviceChannelService.updateChannel(deviceId,channel);
 								if (userSetting.getDeviceStatusNotify()) {
 									// 发送redis消息
 									redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
 								}
-							}
-							break;
-						default:
-							logger.warn("[ NotifyCatalog ] event not found : {}", event );
+								break;
+							default:
+								logger.warn("[ NotifyCatalog ] event not found : {}", event );
 
-					}
-					// 转发变化信息
-					eventPublisher.catalogEventPublish(null, channel, event);
-
-					if (!updateChannelMap.keySet().isEmpty()
-							|| !addChannelMap.keySet().isEmpty()
-							|| !updateChannelOnlineList.isEmpty()
-							|| !updateChannelOfflineList.isEmpty()
-							|| !deleteChannelList.isEmpty()) {
-
-						if (!dynamicTask.contains(talkKey)) {
-							dynamicTask.startDelay(talkKey, this::executeSave, 1000);
 						}
+						// 转发变化信息
+						eventPublisher.catalogEventPublish(null, channel, event);
 					}
 				}
+			} catch (DocumentException e) {
+				logger.error("未处理的异常 ", e);
 			}
-		} catch (DocumentException e) {
-			logger.error("未处理的异常 ", e);
-		}
-	}
-
-	// TODO 同一个通道如果先发送更新再发送离线可能无法正常离线
-	private void executeSave(){
-		try {
-			executeSaveForAdd();
-		} catch (Exception e) {
-			logger.error("[存储收到的增加通道] 异常: ", e );
-		}
-		try {
-			executeSaveForOnline();
-		} catch (Exception e) {
-			logger.error("[存储收到的通道上线] 异常: ", e );
-		}
-		try {
-			executeSaveForOffline();
-		} catch (Exception e) {
-			logger.error("[存储收到的通道离线] 异常: ", e );
-		}
-		try {
-			executeSaveForUpdate();
-		} catch (Exception e) {
-			logger.error("[存储收到的更新通道] 异常: ", e );
-		}
-		try {
-			executeSaveForDelete();
-		} catch (Exception e) {
-			logger.error("[存储收到的删除通道] 异常: ", e );
-		}
-
-		dynamicTask.stop(talkKey);
-	}
-
-	private void executeSaveForUpdate(){
-		if (!updateChannelMap.values().isEmpty()) {
-			logger.info("[存储收到的更新通道], 数量: {}", updateChannelMap.size());
-			ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
-			deviceChannelService.batchUpdateChannel(deviceChannels);
-			updateChannelMap.clear();
-		}
-
-	}
-
-	private void executeSaveForAdd(){
-		if (!addChannelMap.values().isEmpty()) {
-			ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values());
-			addChannelMap.clear();
-			deviceChannelService.batchAddChannel(deviceChannels);
-		}
-	}
-
-	private void executeSaveForDelete(){
-		if (!deleteChannelList.isEmpty()) {
-			deviceChannelService.deleteChannels(deleteChannelList);
-			deleteChannelList.clear();
-		}
-	}
-
-	private void executeSaveForOnline(){
-		if (!updateChannelOnlineList.isEmpty()) {
-			deviceChannelService.channelsOnline(updateChannelOnlineList);
-			updateChannelOnlineList.clear();
 		}
 	}
-
-	private void executeSaveForOffline(){
-		if (!updateChannelOfflineList.isEmpty()) {
-			deviceChannelService.channelsOffline(updateChannelOfflineList);
-			updateChannelOfflineList.clear();
-		}
-	}
-
 }

+ 128 - 165
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java

@@ -1,8 +1,6 @@
 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 
-import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
-import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.conf.SipConfig;
+import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
@@ -19,8 +17,8 @@ import org.dom4j.Element;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.ObjectUtils;
 
 import javax.sip.RequestEvent;
@@ -29,7 +27,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
  * SIP命令类型: NOTIFY请求中的移动位置请求处理
@@ -40,10 +37,6 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
 
     private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class);
 
-	private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
-
-	private final List<MobilePosition> addMobilePositionList = new CopyOnWriteArrayList();
-
 
 	@Autowired
 	private UserSetting userSetting;
@@ -57,180 +50,150 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
 	@Autowired
 	private IDeviceChannelService deviceChannelService;
 
-	@Autowired
-	private DynamicTask dynamicTask;
-
-	@Autowired
-	private CivilCodeFileConf civilCodeFileConf;
-
-	@Autowired
-	private SipConfig sipConfig;
-
-	private final static String talkKey = "notify-request-for-mobile-position-task";
-
-	@Async("taskExecutor")
-	public void process(RequestEvent evt) {
-		try {
-			FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
-			String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
-			long startTime = System.currentTimeMillis();
-			// 回复 200 OK
-			Element rootElement = getRootElement(evt);
-			if (rootElement == null) {
-				logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
-				return;
-			}
-			Device device = redisCatchStorage.getDevice(deviceId);
-			MobilePosition mobilePosition = new MobilePosition();
-			mobilePosition.setCreateTime(DateUtil.getNow());
-			List<Element> elements = rootElement.elements();
-			String channelId = null;
-			for (Element element : elements) {
-				switch (element.getName()){
-					case "DeviceID":
-						channelId = element.getStringValue();
-						if (device == null) {
-							device = redisCatchStorage.getDevice(channelId);
-							if (device == null) {
-								// 根据通道id查询设备Id
-								List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
-								if (!deviceList.isEmpty()) {
-									device = deviceList.get(0);
-								}
+	@Transactional
+	public void process(List<RequestEvent> eventList) {
+		if (eventList.isEmpty()) {
+			return;
+		}
+		Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
+		List<MobilePosition> addMobilePositionList = new ArrayList<>();
+		for (RequestEvent evt : eventList) {
+			try {
+				FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
+				String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
+				long startTime = System.currentTimeMillis();
+				// 回复 200 OK
+				Element rootElement = getRootElement(evt);
+				if (rootElement == null) {
+					logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
+					return;
+				}
+				Device device = redisCatchStorage.getDevice(deviceId);
+				if (device == null) {
+					logger.error("处理MobilePosition移动位置Notify时未获取到device,{}", deviceId);
+					return;
+				}
+				MobilePosition mobilePosition = new MobilePosition();
+				mobilePosition.setDeviceId(device.getDeviceId());
+				mobilePosition.setDeviceName(device.getName());
+				mobilePosition.setCreateTime(DateUtil.getNow());
+				List<Element> elements = rootElement.elements();
+				for (Element element : elements) {
+					switch (element.getName()){
+						case "DeviceID":
+							String channelId = element.getStringValue();
+							if (!deviceId.equals(channelId)) {
+								mobilePosition.setChannelId(channelId);
 							}
-						}
-						if (device == null) {
-							logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
-							return;
-						}
-						mobilePosition.setDeviceId(device.getDeviceId());
-						mobilePosition.setChannelId(channelId);
-						// 兼容设备部分设备上报是通道编号与设备编号一致的情况
-						if (deviceId.equals(channelId)) {
-							List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
-							if (deviceChannels.size() == 1) {
-								channelId = deviceChannels.get(0).getChannelId();
+							continue;
+						case "Time":
+							String timeVal = element.getStringValue();
+							if (ObjectUtils.isEmpty(timeVal)) {
+								mobilePosition.setTime(DateUtil.getNow());
+							} else {
+								mobilePosition.setTime(SipUtils.parseTime(timeVal));
 							}
-						}
-						if (!ObjectUtils.isEmpty(device.getName())) {
-							mobilePosition.setDeviceName(device.getName());
-						}
-						mobilePosition.setDeviceId(device.getDeviceId());
-						mobilePosition.setChannelId(channelId);
-						continue;
-					case "Time":
-						String timeVal = element.getStringValue();
-						if (ObjectUtils.isEmpty(timeVal)) {
-							mobilePosition.setTime(DateUtil.getNow());
-						} else {
-							mobilePosition.setTime(SipUtils.parseTime(timeVal));
-						}
-						continue;
-					case "Longitude":
-						mobilePosition.setLongitude(Double.parseDouble(element.getStringValue()));
-						continue;
-					case "Latitude":
-						mobilePosition.setLatitude(Double.parseDouble(element.getStringValue()));
-						continue;
-					case "Speed":
-						String speedVal = element.getStringValue();
-						if (NumericUtil.isDouble(speedVal)) {
-							mobilePosition.setSpeed(Double.parseDouble(speedVal));
-						} else {
-							mobilePosition.setSpeed(0.0);
-						}
-						continue;
-					case "Direction":
-						String directionVal = element.getStringValue();
-						if (NumericUtil.isDouble(directionVal)) {
-							mobilePosition.setDirection(Double.parseDouble(directionVal));
-						} else {
-							mobilePosition.setDirection(0.0);
-						}
-						continue;
-					case "Altitude":
-						String altitudeVal = element.getStringValue();
-						if (NumericUtil.isDouble(altitudeVal)) {
-							mobilePosition.setAltitude(Double.parseDouble(altitudeVal));
-						} else {
-							mobilePosition.setAltitude(0.0);
-						}
-						continue;
+							continue;
+						case "Longitude":
+							mobilePosition.setLongitude(Double.parseDouble(element.getStringValue()));
+							continue;
+						case "Latitude":
+							mobilePosition.setLatitude(Double.parseDouble(element.getStringValue()));
+							continue;
+						case "Speed":
+							String speedVal = element.getStringValue();
+							if (NumericUtil.isDouble(speedVal)) {
+								mobilePosition.setSpeed(Double.parseDouble(speedVal));
+							} else {
+								mobilePosition.setSpeed(0.0);
+							}
+							continue;
+						case "Direction":
+							String directionVal = element.getStringValue();
+							if (NumericUtil.isDouble(directionVal)) {
+								mobilePosition.setDirection(Double.parseDouble(directionVal));
+							} else {
+								mobilePosition.setDirection(0.0);
+							}
+							continue;
+						case "Altitude":
+							String altitudeVal = element.getStringValue();
+							if (NumericUtil.isDouble(altitudeVal)) {
+								mobilePosition.setAltitude(Double.parseDouble(altitudeVal));
+							} else {
+								mobilePosition.setAltitude(0.0);
+							}
+							continue;
 
+					}
 				}
-			}
 
 //			logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
 //					mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
-			mobilePosition.setReportSource("Mobile Position");
-
-			// 更新device channel 的经纬度
-			DeviceChannel deviceChannel = new DeviceChannel();
-			deviceChannel.setDeviceId(device.getDeviceId());
-			deviceChannel.setChannelId(channelId);
-			deviceChannel.setLongitude(mobilePosition.getLongitude());
-			deviceChannel.setLatitude(mobilePosition.getLatitude());
-			deviceChannel.setGpsTime(mobilePosition.getTime());
-			updateChannelMap.put(deviceId + channelId, deviceChannel);
-			addMobilePositionList.add(mobilePosition);
-			if(updateChannelMap.size() > 2000) {
-				executeSaveChannel();
-			}
-			if (userSetting.isSavePositionHistory()) {
-				if(addMobilePositionList.size() > 2000) {
-					executeSaveMobilePosition();
+				mobilePosition.setReportSource("Mobile Position");
+
+				// 更新device channel 的经纬度
+				DeviceChannel deviceChannel = new DeviceChannel();
+				deviceChannel.setDeviceId(device.getDeviceId());
+				deviceChannel.setLongitude(mobilePosition.getLongitude());
+				deviceChannel.setLatitude(mobilePosition.getLatitude());
+				deviceChannel.setGpsTime(mobilePosition.getTime());
+				updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel);
+				addMobilePositionList.add(mobilePosition);
+
+
+				// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
+				try {
+					eventPublisher.mobilePositionEventPublish(mobilePosition);
+				}catch (Exception e) {
+					logger.error("[向上级转发移动位置失败] ", e);
 				}
+				if (mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) || mobilePosition.getChannelId() == null) {
+					List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId());
+					channels.forEach(channel -> {
+						// 发送redis消息。 通知位置信息的变化
+						JSONObject jsonObject = new JSONObject();
+						jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
+						jsonObject.put("serial", channel.getDeviceId());
+						jsonObject.put("code", channel.getChannelId());
+						jsonObject.put("longitude", mobilePosition.getLongitude());
+						jsonObject.put("latitude", mobilePosition.getLatitude());
+						jsonObject.put("altitude", mobilePosition.getAltitude());
+						jsonObject.put("direction", mobilePosition.getDirection());
+						jsonObject.put("speed", mobilePosition.getSpeed());
+						redisCatchStorage.sendMobilePositionMsg(jsonObject);
+					});
+				}else {
+					// 发送redis消息。 通知位置信息的变化
+					JSONObject jsonObject = new JSONObject();
+					jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
+					jsonObject.put("serial", mobilePosition.getDeviceId());
+					jsonObject.put("code", mobilePosition.getChannelId());
+					jsonObject.put("longitude", mobilePosition.getLongitude());
+					jsonObject.put("latitude", mobilePosition.getLatitude());
+					jsonObject.put("altitude", mobilePosition.getAltitude());
+					jsonObject.put("direction", mobilePosition.getDirection());
+					jsonObject.put("speed", mobilePosition.getSpeed());
+					redisCatchStorage.sendMobilePositionMsg(jsonObject);
+				}
+			} catch (DocumentException e) {
+				logger.error("未处理的异常 ", e);
 			}
-
-//			deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
-//
-//			mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
-//			mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
-//			mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
-//			mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
-
-//			deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
-
-			if (!dynamicTask.contains(talkKey)) {
-				dynamicTask.startDelay(talkKey, this::executeSave, 3000);
-			}
-
-		} catch (DocumentException e) {
-			logger.error("未处理的异常 ", e);
 		}
-
-
-	}
-
-	private void executeSave(){
-		executeSaveChannel();
-		executeSaveMobilePosition();
-		dynamicTask.stop(talkKey);
-	}
-
-	@Async("taskExecutor")
-	public void executeSaveChannel(){
-		dynamicTask.execute();
-		try {
-			logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size());
-			ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
-			deviceChannelService.batchUpdateChannelGPS(deviceChannels);
+		if(!updateChannelMap.isEmpty()) {
+			List<DeviceChannel>  channels = new ArrayList<>(updateChannelMap.values());
+			logger.info("[移动位置订阅]更新通道位置: {}", channels.size());
+			deviceChannelService.batchUpdateChannelGPS(channels);
 			updateChannelMap.clear();
-		}catch (Exception e) {
-
 		}
-	}
-	@Async("taskExecutor")
-	public void executeSaveMobilePosition(){
-		if (userSetting.isSavePositionHistory()) {
+		if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) {
 			try {
 				logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
 				deviceChannelService.batchAddMobilePosition(addMobilePositionList);
-				addMobilePositionList.clear();
 			}catch (Exception e) {
 				logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size());
 			}
+			addMobilePositionList.clear();
 		}
 	}
-
 }

+ 131 - 120
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java

@@ -37,6 +37,7 @@ import javax.sip.SipException;
 import javax.sip.header.FromHeader;
 import javax.sip.message.Response;
 import java.text.ParseException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -102,55 +103,63 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 				responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null);
 				logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
 				return;
-			}else {
+			} else {
 				responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
 			}
 
-		}catch (SipException | InvalidArgumentException | ParseException e) {
+		} catch (SipException | InvalidArgumentException | ParseException e) {
 			logger.error("未处理的异常 ", e);
 		}
 		boolean runed = !taskQueue.isEmpty();
 		taskQueue.offer(new HandlerCatchData(evt, null, null));
-		if (!runed) {
-			taskExecutor.execute(()-> {
-//				logger.warn("开始处理");
-				while (!taskQueue.isEmpty()) {
-					try {
-						HandlerCatchData take = taskQueue.poll();
-						if (take == null) {
-							continue;
-						}
-						Element rootElement = getRootElement(take.getEvt());
-						if (rootElement == null) {
-							logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
-							continue;
-						}
-						String cmd = XmlUtil.getText(rootElement, "CmdType");
-
-						if (CmdType.CATALOG.equals(cmd)) {
-							logger.info("接收到Catalog通知");
-							notifyRequestForCatalogProcessor.process(take.getEvt());
-						} else if (CmdType.ALARM.equals(cmd)) {
-							logger.info("接收到Alarm通知");
-							processNotifyAlarm(take.getEvt());
-						} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
-//							logger.info("接收到MobilePosition通知");
-//							processNotifyMobilePosition(take.getEvt());
-//							taskExecutor.execute(() -> {
-								notifyRequestForMobilePositionProcessor.process(take.getEvt());
-//							});
-
-						} else {
-							logger.info("接收到消息:" + cmd);
-						}
-					} catch (DocumentException e) {
-						logger.error("处理NOTIFY消息时错误", e);
-					}
+	}
+	@Scheduled(fixedRate = 200)   //每200毫秒执行一次
+	public void executeTaskQueue(){
+		if (taskQueue.isEmpty()) {
+			return;
+		}
+		try {
+			List<RequestEvent> catalogEventList = new ArrayList<>();
+			List<RequestEvent> alarmEventList = new ArrayList<>();
+			List<RequestEvent> mobilePositionEventList = new ArrayList<>();
+			for (HandlerCatchData take : taskQueue) {
+				if (take == null) {
+					continue;
 				}
-			});
+				Element rootElement = getRootElement(take.getEvt());
+				if (rootElement == null) {
+					logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
+					continue;
+				}
+				String cmd = XmlUtil.getText(rootElement, "CmdType");
+
+				if (CmdType.CATALOG.equals(cmd)) {
+					catalogEventList.add(take.getEvt());
+				} else if (CmdType.ALARM.equals(cmd)) {
+					alarmEventList.add(take.getEvt());
+				} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
+					mobilePositionEventList.add(take.getEvt());
+				} else {
+					logger.info("接收到消息:" + cmd);
+				}
+			}
+			taskQueue.clear();
+			if (!alarmEventList.isEmpty()) {
+				processNotifyAlarm(alarmEventList);
+			}
+			if (!catalogEventList.isEmpty()) {
+				notifyRequestForCatalogProcessor.process(catalogEventList);
+			}
+			if (!mobilePositionEventList.isEmpty()) {
+				notifyRequestForMobilePositionProcessor.process(mobilePositionEventList);
+			}
+		} catch (DocumentException e) {
+			logger.error("处理NOTIFY消息时错误", e);
 		}
 	}
 
+
+
 	/**
 	 * 处理MobilePosition移动位置Notify
 	 *
@@ -253,95 +262,97 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 
 	/***
 	 * 处理alarm设备报警Notify
-	 *
-	 * @param evt
 	 */
-	private void processNotifyAlarm(RequestEvent evt) {
+	private void processNotifyAlarm(List<RequestEvent> evtList) {
 		if (!sipConfig.isAlarm()) {
 			return;
 		}
-		try {
-			FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
-			String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
-
-			Element rootElement = getRootElement(evt);
-			if (rootElement == null) {
-				logger.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest());
-				return;
-			}
-			Element deviceIdElement = rootElement.element("DeviceID");
-			String channelId = deviceIdElement.getText().toString();
+		if (!evtList.isEmpty()) {
+			for (RequestEvent evt : evtList) {
+				try {
+					FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
+					String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
+
+					Element rootElement = getRootElement(evt);
+					if (rootElement == null) {
+						logger.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest());
+						return;
+					}
+					Element deviceIdElement = rootElement.element("DeviceID");
+					String channelId = deviceIdElement.getText().toString();
 
-			Device device = redisCatchStorage.getDevice(deviceId);
-			if (device == null) {
-				logger.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId);
-				return;
-			}
-			rootElement = getRootElement(evt, device.getCharset());
-			if (rootElement == null) {
-				logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest());
-				return;
-			}
-			DeviceAlarm deviceAlarm = new DeviceAlarm();
-			deviceAlarm.setDeviceId(deviceId);
-			deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority"));
-			deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod"));
-			String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");
-			if (alarmTime == null) {
-				logger.warn("[ NotifyAlarm ] AlarmTime cannot be null");
-				return;
-			}
-			deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
-			if (XmlUtil.getText(rootElement, "AlarmDescription") == null) {
-				deviceAlarm.setAlarmDescription("");
-			} else {
-				deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription"));
-			}
-			if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) {
-				deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
-			} else {
-				deviceAlarm.setLongitude(0.00);
-			}
-			if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) {
-				deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
-			} else {
-				deviceAlarm.setLatitude(0.00);
-			}
-			logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId());
-			if ("4".equals(deviceAlarm.getAlarmMethod())) {
-				MobilePosition mobilePosition = new MobilePosition();
-				mobilePosition.setChannelId(channelId);
-				mobilePosition.setCreateTime(DateUtil.getNow());
-				mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
-				mobilePosition.setTime(deviceAlarm.getAlarmTime());
-				mobilePosition.setLongitude(deviceAlarm.getLongitude());
-				mobilePosition.setLatitude(deviceAlarm.getLatitude());
-				mobilePosition.setReportSource("GPS Alarm");
-
-				// 更新device channel 的经纬度
-				DeviceChannel deviceChannel = new DeviceChannel();
-				deviceChannel.setDeviceId(device.getDeviceId());
-				deviceChannel.setChannelId(channelId);
-				deviceChannel.setLongitude(mobilePosition.getLongitude());
-				deviceChannel.setLatitude(mobilePosition.getLatitude());
-				deviceChannel.setGpsTime(mobilePosition.getTime());
-
-				deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
-
-				mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
-				mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
-				mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
-				mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
-
-				deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
-			}
+					Device device = redisCatchStorage.getDevice(deviceId);
+					if (device == null) {
+						logger.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId);
+						return;
+					}
+					rootElement = getRootElement(evt, device.getCharset());
+					if (rootElement == null) {
+						logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest());
+						return;
+					}
+					DeviceAlarm deviceAlarm = new DeviceAlarm();
+					deviceAlarm.setDeviceId(deviceId);
+					deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority"));
+					deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod"));
+					String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");
+					if (alarmTime == null) {
+						logger.warn("[ NotifyAlarm ] AlarmTime cannot be null");
+						return;
+					}
+					deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
+					if (XmlUtil.getText(rootElement, "AlarmDescription") == null) {
+						deviceAlarm.setAlarmDescription("");
+					} else {
+						deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription"));
+					}
+					if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) {
+						deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
+					} else {
+						deviceAlarm.setLongitude(0.00);
+					}
+					if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) {
+						deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
+					} else {
+						deviceAlarm.setLatitude(0.00);
+					}
+					logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId());
+					if ("4".equals(deviceAlarm.getAlarmMethod())) {
+						MobilePosition mobilePosition = new MobilePosition();
+						mobilePosition.setChannelId(channelId);
+						mobilePosition.setCreateTime(DateUtil.getNow());
+						mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
+						mobilePosition.setTime(deviceAlarm.getAlarmTime());
+						mobilePosition.setLongitude(deviceAlarm.getLongitude());
+						mobilePosition.setLatitude(deviceAlarm.getLatitude());
+						mobilePosition.setReportSource("GPS Alarm");
+
+						// 更新device channel 的经纬度
+						DeviceChannel deviceChannel = new DeviceChannel();
+						deviceChannel.setDeviceId(device.getDeviceId());
+						deviceChannel.setChannelId(channelId);
+						deviceChannel.setLongitude(mobilePosition.getLongitude());
+						deviceChannel.setLatitude(mobilePosition.getLatitude());
+						deviceChannel.setGpsTime(mobilePosition.getTime());
+
+						deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
+
+						mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
+						mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
+						mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
+						mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
+
+						deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
+					}
 
-			// 回复200 OK
-			if (redisCatchStorage.deviceIsOnline(deviceId)) {
-				publisher.deviceAlarmEventPublish(deviceAlarm);
+					// 回复200 OK
+					if (redisCatchStorage.deviceIsOnline(deviceId)) {
+						publisher.deviceAlarmEventPublish(deviceAlarm);
+					}
+				} catch (DocumentException e) {
+					logger.error("未处理的异常 ", e);
+				}
 			}
-		} catch (DocumentException e) {
-			logger.error("未处理的异常 ", e);
 		}
 	}
 

+ 5 - 0
src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java

@@ -102,4 +102,9 @@ public interface IDeviceChannelService {
 
     void batchAddMobilePosition(List<MobilePosition> addMobilePositionList);
 
+    void online(DeviceChannel channel);
+
+    void offline(DeviceChannel channel);
+
+    void delete(DeviceChannel channel);
 }

+ 51 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java

@@ -23,6 +23,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.ObjectUtils;
 
 import java.util.ArrayList;
@@ -247,11 +248,27 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
         return channelMapper.batchOnline(channels);
     }
 
+    @Override
+    public void online(DeviceChannel channel) {
+        channelMapper.online(channel.getDeviceId(), channel.getChannelId());
+    }
+
     @Override
     public int channelsOffline(List<DeviceChannel> channels) {
         return channelMapper.batchOffline(channels);
     }
 
+
+    @Override
+    public void offline(DeviceChannel channel) {
+        channelMapper.offline(channel.getDeviceId(), channel.getChannelId());
+    }
+
+    @Override
+    public void delete(DeviceChannel channel) {
+        channelMapper.del(channel.getDeviceId(), channel.getChannelId());
+    }
+
     @Override
     public DeviceChannel getOne(String deviceId, String channelId){
         return channelMapper.queryChannel(deviceId, channelId);
@@ -355,12 +372,45 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
     }
 
     @Override
+    @Transactional
     public void batchUpdateChannelGPS(List<DeviceChannel> channelList) {
-        channelMapper.batchUpdate(channelList);
+        for (DeviceChannel deviceChannel : channelList) {
+            deviceChannel.setUpdateTime(DateUtil.getNow());
+            if (deviceChannel.getGpsTime() == null) {
+                deviceChannel.setGpsTime(DateUtil.getNow());
+            }
+        }
+        int count = 1000;
+        if (channelList.size() > count) {
+            for (int i = 0; i < channelList.size(); i+=count) {
+                int toIndex = i+count;
+                if ( i + count > channelList.size()) {
+                    toIndex = channelList.size();
+                }
+                List<DeviceChannel> channels = channelList.subList(i, toIndex);
+                channelMapper.batchUpdatePosition(channels);
+            }
+        }else {
+            channelMapper.batchUpdatePosition(channelList);
+        }
     }
 
     @Override
+    @Transactional
     public void batchAddMobilePosition(List<MobilePosition> mobilePositions) {
+//        int count = 500;
+//        if (mobilePositions.size() > count) {
+//            for (int i = 0; i < mobilePositions.size(); i+=count) {
+//                int toIndex = i+count;
+//                if ( i + count > mobilePositions.size()) {
+//                    toIndex = mobilePositions.size();
+//                }
+//                List<MobilePosition> mobilePositionsSub = mobilePositions.subList(i, toIndex);
+//                deviceMobilePositionMapper.batchadd(mobilePositionsSub);
+//            }
+//        }else {
+//            deviceMobilePositionMapper.batchadd(mobilePositions);
+//        }
         deviceMobilePositionMapper.batchadd(mobilePositions);
     }
 }

+ 17 - 0
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java

@@ -401,6 +401,23 @@ public interface DeviceChannelMapper {
             " </script>"})
     int updatePosition(DeviceChannel deviceChannel);
 
+    @Update({"<script>" +
+            "<foreach collection='deviceChannelList' item='item' separator=';'>" +
+            " UPDATE" +
+            " wvp_device_channel" +
+            " SET gps_time=#{item.gpsTime}" +
+            "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" +
+            "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" +
+            "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" +
+            "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" +
+            "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" +
+            "<if test='item.latitudeWgs84 != null'>, latitude_wgs84=#{item.latitudeWgs84}</if>" +
+            "WHERE device_id=#{item.deviceId} " +
+            " <if test='item.channelId != null' > AND channel_id=#{item.channelId}</if>" +
+            "</foreach>" +
+            "</script>"})
+    int batchUpdatePosition(List<DeviceChannel> deviceChannelList);
+
     @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0")
     List<DeviceChannel> getAllChannelInPlay();
 

+ 14 - 0
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java

@@ -46,6 +46,20 @@ public interface DeviceMobilePositionMapper {
             "#{item.createTime}) " +
             "</foreach> " +
             "</script>")
+    void batchadd2(List<MobilePosition> mobilePositions);
+
+    @Insert("<script> " +
+            "<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
+            "insert into wvp_device_mobile_position " +
+            "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
+            "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
+            "values " +
+            "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
+            "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
+            "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
+            "#{item.createTime}); " +
+            "</foreach> " +
+            "</script>")
     void batchadd(List<MobilePosition> mobilePositions);
 
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -570,7 +570,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
     @Override
     public void sendMobilePositionMsg(JSONObject jsonObject) {
         String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION;
-        logger.info("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString());
+//        logger.info("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString());
         redisTemplate.convertAndSend(key, jsonObject);
     }