Jelajahi Sumber

级联平台添加GPS订阅支持

lin 3 tahun lalu
induk
melakukan
ddb36e54bd
18 mengubah file dengan 530 tambahan dan 42 penghapusan
  1. 4 0
      src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
  2. 8 0
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java
  3. 78 0
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java
  4. 52 0
      src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java
  5. 59 0
      src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java
  6. 10 0
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java
  7. 46 3
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
  8. 40 1
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
  9. 27 1
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
  10. 2 2
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
  11. 4 6
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
  12. 122 14
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
  13. 1 1
      src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java
  14. 1 0
      src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java
  15. 13 4
      src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
  16. 7 0
      src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
  17. 43 0
      src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
  18. 13 10
      src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java

+ 4 - 0
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java

@@ -58,6 +58,10 @@ public class VideoManagerConstants {
 
 	public static final String SIP_CSEQ_PREFIX = "VMP_SIP_CSEQ_";
 
+	public static final String SIP_SN_PREFIX = "VMP_SIP_SN_";
+
+	public static final String SIP_SUBSCRIBE_PREFIX = "SIP_SUBSCRIBE_";
+
 	//************************** redis 消息*********************************
 	public static final String WVP_MSG_STREAM_CHANGE_PREFIX = "WVP_MSG_STREAM_CHANGE_";
 	public static final String WVP_MSG_GPS_PREFIX = "WVP_MSG_GPS_";

+ 8 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java

@@ -0,0 +1,8 @@
+package com.genersoft.iot.vmp.gb28181.bean;
+
+public class CmdType {
+
+    public static final String CATALOG = "Catalog";
+    public static final String ALARM = "Alarm";
+    public static final String MOBILE_POSITION = "MobilePosition";
+}

+ 78 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java

@@ -0,0 +1,78 @@
+package com.genersoft.iot.vmp.gb28181.bean;
+
+import javax.sip.RequestEvent;
+import javax.sip.header.*;
+import javax.sip.message.Request;
+
+public class SubscribeInfo {
+
+    public SubscribeInfo() {
+    }
+
+    public SubscribeInfo(RequestEvent evt, String id) {
+        this.id = id;
+        Request request = evt.getRequest();
+        CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
+        this.callId = callIdHeader.getCallId();
+        FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
+        this.fromTag = fromHeader.getTag();
+        ExpiresHeader expiresHeader = (ExpiresHeader)request.getHeader(ExpiresHeader.NAME);
+        this.expires = expiresHeader.getExpires();
+        this.event = (EventHeader)request.getHeader(EventHeader.NAME);
+    }
+
+    private String id;
+    private int expires;
+    private String callId;
+    private EventHeader event;
+    private String fromTag;
+    private String toTag;
+
+    public String getId() {
+        return id;
+    }
+
+    public int getExpires() {
+        return expires;
+    }
+
+    public String getCallId() {
+        return callId;
+    }
+
+    public EventHeader getEvent() {
+        return event;
+    }
+
+    public String getFromTag() {
+        return fromTag;
+    }
+
+    public void setToTag(String toTag) {
+        this.toTag = toTag;
+    }
+
+    public String getToTag() {
+        return toTag;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public void setExpires(int expires) {
+        this.expires = expires;
+    }
+
+    public void setCallId(String callId) {
+        this.callId = callId;
+    }
+
+    public void setEvent(EventHeader event) {
+        this.event = event;
+    }
+
+    public void setFromTag(String fromTag) {
+        this.fromTag = fromTag;
+    }
+}

+ 52 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java

@@ -0,0 +1,52 @@
+package com.genersoft.iot.vmp.gb28181.event.subscribe;
+
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.conf.DynamicTask;
+import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
+import com.genersoft.iot.vmp.conf.UserSetup;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
+import org.checkerframework.checker.units.qual.A;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.stereotype.Component;
+
+/**    
+ * 平台订阅到期事件
+ */
+@Component
+public class SubscribeListenerForPlatform extends RedisKeyExpirationEventMessageListener {
+
+    private Logger logger = LoggerFactory.getLogger(SubscribeListenerForPlatform.class);
+
+	@Autowired
+	private UserSetup userSetup;
+
+    @Autowired
+    private DynamicTask dynamicTask;
+
+    public SubscribeListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) {
+        super(listenerContainer, userSetup);
+    }
+
+
+    /**
+     * 监听失效的key
+     * @param message
+     * @param pattern
+     */
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+        //  获取失效的key
+        String expiredKey = message.toString();
+        logger.debug(expiredKey);
+        // 订阅到期
+        String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_";
+        if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) {
+            // 取消定时任务
+            dynamicTask.stopCron(expiredKey);
+        }
+    }
+}

+ 59 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java

@@ -0,0 +1,59 @@
+package com.genersoft.iot.vmp.gb28181.task;
+
+import com.genersoft.iot.vmp.gb28181.bean.GbStream;
+import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+
+import java.text.SimpleDateFormat;
+import java.util.List;
+
+public class GPSSubscribeTask implements Runnable{
+
+    private IRedisCatchStorage redisCatchStorage;
+    private IVideoManagerStorager storager;
+    private ISIPCommanderForPlatform sipCommanderForPlatform;
+    private String platformId;
+    private String sn;
+    private String key;
+
+    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+    public GPSSubscribeTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorager storager, String platformId, String sn, String key) {
+        this.redisCatchStorage = redisCatchStorage;
+        this.storager = storager;
+        this.platformId = platformId;
+        this.sn = sn;
+        this.key = key;
+        this.sipCommanderForPlatform = sipCommanderForPlatform;
+    }
+
+    @Override
+    public void run() {
+
+        SubscribeInfo subscribe = redisCatchStorage.getSubscribe(key);
+        if (subscribe != null) {
+            System.out.println("发送GPS消息");
+            ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
+            if (parentPlatform == null || parentPlatform.isStatus()) {
+                // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
+                List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platformId);
+                if (gbStreams.size() > 0) {
+                    for (GbStream gbStream : gbStreams) {
+                        String gbId = gbStream.getGbId();
+                        GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
+                        if (gpsMsgInfo != null && gbStream.isStatus()) {
+                            // 发送GPS消息
+                            sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
+                        }
+                    }
+                }
+            }
+        }
+
+
+    }
+}

+ 10 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java

@@ -2,7 +2,9 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
 
 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 
 import javax.sip.header.WWWAuthenticateHeader;
 
@@ -61,4 +63,12 @@ public interface ISIPCommanderForPlatform {
      */
     boolean deviceStatusResponse(ParentPlatform parentPlatform, String sn, String fromTag);
 
+    /**
+     * 向上级回复移动位置订阅消息
+     * @param parentPlatform 平台信息
+     * @param gpsMsgInfo GPS信息
+     * @param subscribeInfo 订阅相关的信息
+     * @return
+     */
+    boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo);
 }

+ 46 - 3
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java

@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
 
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import gov.nist.javax.sip.message.MessageFactoryImpl;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
@@ -32,6 +33,9 @@ public class SIPRequestHeaderPlarformProvider {
 	@Autowired
 	private SipFactory sipFactory;
 
+	@Autowired
+	private IRedisCatchStorage redisCatchStorage;
+
 
 	public Request createKeetpaliveMessageRequest(ParentPlatform parentPlatform, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
 		Request request = null;
@@ -57,7 +61,7 @@ public class SIPRequestHeaderPlarformProvider {
 		// Forwards
 		MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
 		// ceq
-		CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.MESSAGE);
+		CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE);
 
 		request = sipFactory.createMessageFactory().createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader,
 				toHeader, viaHeaders, maxForwards);
@@ -122,7 +126,7 @@ public class SIPRequestHeaderPlarformProvider {
 										 String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader) throws ParseException, PeerUnavailableException, InvalidArgumentException {
 
 
-		Request registerRequest = createRegisterRequest(parentPlatform, 2L, fromTag, viaTag, callIdHeader);
+		Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), fromTag, viaTag, callIdHeader);
 
 		String realm = www.getRealm();
 		String nonce = www.getNonce();
@@ -208,7 +212,7 @@ public class SIPRequestHeaderPlarformProvider {
 		// Forwards
 		MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
 		// ceq
-		CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.MESSAGE);
+		CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE);
 		MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
 		// 设置编码, 防止中文乱码
 		messageFactory.setDefaultContentEncodingCharset("gb2312");
@@ -223,4 +227,43 @@ public class SIPRequestHeaderPlarformProvider {
 		request.setContent(content, contentTypeHeader);
 		return request;
 	}
+
+    public Request createNotifyRequest(ParentPlatform parentPlatform, String content, String fromTag, String toTag, CallIdHeader callIdHeader) throws PeerUnavailableException, ParseException, InvalidArgumentException {
+		Request request = null;
+		// sipuri
+		SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP()+ ":" + parentPlatform.getServerPort());
+		// via
+		ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
+		ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()),
+				parentPlatform.getTransport(), null);
+		viaHeader.setRPort();
+		viaHeaders.add(viaHeader);
+		// from
+		SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(),
+				parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort());
+		Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI);
+		FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, fromTag);
+		// to
+		SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain());
+		Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI);
+		ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, toTag);
+
+		// Forwards
+		MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
+		// ceq
+		CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.NOTIFY), Request.NOTIFY);
+		MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
+		// 设置编码, 防止中文乱码
+		messageFactory.setDefaultContentEncodingCharset("gb2312");
+		request = messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader,
+				toHeader, viaHeaders, maxForwards);
+		List<String> agentParam = new ArrayList<>();
+		agentParam.add("wvp-pro");
+		UserAgentHeader userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
+		request.addHeader(userAgentHeader);
+
+		ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml");
+		request.setContent(content, contentTypeHeader);
+		return request;
+    }
 }

+ 40 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java

@@ -3,9 +3,11 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
+import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,7 +94,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
                     callIdHeader = udpSipProvider.getNewCallId();
                 }
 
-                request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, 1L, "FromRegister" + tm, null, callIdHeader);
+                request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), "FromRegister" + tm, null, callIdHeader);
                 // 将 callid 写入缓存, 等注册成功可以更新状态
                 redisCatchStorage.updatePlatformRegisterInfo(callIdHeader.getCallId(), parentPlatform.getServerGBId());
 
@@ -325,4 +327,41 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
         }
         return true;
     }
+
+    @Override
+    public boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo) {
+        if (parentPlatform == null) {
+            return false;
+        }
+
+        try {
+            StringBuffer deviceStatusXml = new StringBuffer(600);
+            deviceStatusXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
+            deviceStatusXml.append("<Notify>\r\n");
+            deviceStatusXml.append("<CmdType>MobilePosition</CmdType>\r\n");
+            deviceStatusXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
+            deviceStatusXml.append("<DeviceID>" + gpsMsgInfo.getId() + "</DeviceID>\r\n");
+            deviceStatusXml.append("<Time>" + gpsMsgInfo.getTime() + "</Time>\r\n");
+            deviceStatusXml.append("<Longitude>" + gpsMsgInfo.getLng() + "</Longitude>\r\n");
+            deviceStatusXml.append("<Latitude>" + gpsMsgInfo.getLat() + "</Latitude>\r\n");
+            deviceStatusXml.append("<Speed>" + gpsMsgInfo.getSpeed() + "</Speed>\r\n");
+            deviceStatusXml.append("<Direction>" + gpsMsgInfo.getDirection() + "</Direction>\r\n");
+            deviceStatusXml.append("<Altitude>" + gpsMsgInfo.getAltitude() + "</Altitude>\r\n");
+            deviceStatusXml.append("</Notify>\r\n");
+
+            CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
+                    : udpSipProvider.getNewCallId();
+            callIdHeader.setCallId(subscribeInfo.getCallId());
+
+            String tm = Long.toString(System.currentTimeMillis());
+
+            Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, deviceStatusXml.toString(), subscribeInfo.getToTag(), subscribeInfo.getFromTag(), callIdHeader);
+            transmitRequest(parentPlatform, request);
+
+        } catch (SipException | ParseException | InvalidArgumentException e) {
+            e.printStackTrace();
+            return false;
+        }
+        return true;
+    }
 }

+ 27 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java

@@ -18,6 +18,7 @@ import javax.sip.address.Address;
 import javax.sip.address.AddressFactory;
 import javax.sip.address.SipURI;
 import javax.sip.header.ContentTypeHeader;
+import javax.sip.header.ExpiresHeader;
 import javax.sip.header.HeaderFactory;
 import javax.sip.header.ViaHeader;
 import javax.sip.message.MessageFactory;
@@ -153,7 +154,7 @@ public abstract class SIPRequestProcessorParent {
 	 * @throws InvalidArgumentException
 	 * @throws ParseException
 	 */
-	public void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
+	public void responseSdpAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
 		Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
 		SipFactory sipFactory = SipFactory.getInstance();
 		ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
@@ -168,6 +169,31 @@ public abstract class SIPRequestProcessorParent {
 		getServerTransaction(evt).sendResponse(response);
 	}
 
+	/**
+	 * 回复带xml的200
+	 * @param evt
+	 * @param xml
+	 * @throws SipException
+	 * @throws InvalidArgumentException
+	 * @throws ParseException
+	 */
+	public Response responseXmlAck(RequestEvent evt, String xml) throws SipException, InvalidArgumentException, ParseException {
+		Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
+		SipFactory sipFactory = SipFactory.getInstance();
+		ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml");
+		response.setContent(xml, contentTypeHeader);
+
+		SipURI sipURI = (SipURI)evt.getRequest().getRequestURI();
+
+		Address concatAddress = sipFactory.createAddressFactory().createAddress(
+				sipFactory.createAddressFactory().createSipURI(sipURI.getUser(),  sipURI.getHost()+":"+sipURI.getPort()
+				));
+		response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
+		response.addHeader(evt.getRequest().getHeader(ExpiresHeader.NAME));
+		getServerTransaction(evt).sendResponse(response);
+		return response;
+	}
+
 	public Element getRootElement(RequestEvent evt) throws DocumentException {
 		return getRootElement(evt, "gb2312");
 	}

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -253,7 +253,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
 						content.append("f=\r\n");
 
 						try {
-							responseAck(evt, content.toString());
+							responseSdpAck(evt, content.toString());
 						} catch (SipException e) {
 							e.printStackTrace();
 						} catch (InvalidArgumentException e) {
@@ -310,7 +310,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
 					content.append("f=\r\n");
 
 					try {
-						responseAck(evt, content.toString());
+						responseSdpAck(evt, content.toString());
 					} catch (SipException e) {
 						e.printStackTrace();
 					} catch (InvalidArgumentException e) {

+ 4 - 6
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java

@@ -62,9 +62,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 	@Autowired
 	private DeviceOffLineDetector offLineDetector;
 
-	private static final String NOTIFY_CATALOG = "Catalog";
-	private static final String NOTIFY_ALARM = "Alarm";
-	private static final String NOTIFY_MOBILE_POSITION = "MobilePosition";
+
 	private String method = "NOTIFY";
 
 	@Autowired
@@ -82,13 +80,13 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 			Element rootElement = getRootElement(evt);
 			String cmd = XmlUtil.getText(rootElement, "CmdType");
 
-			if (NOTIFY_CATALOG.equals(cmd)) {
+			if (CmdType.CATALOG.equals(cmd)) {
 				logger.info("接收到Catalog通知");
 				processNotifyCatalogList(evt);
-			} else if (NOTIFY_ALARM.equals(cmd)) {
+			} else if (CmdType.ALARM.equals(cmd)) {
 				logger.info("接收到Alarm通知");
 				processNotifyAlarm(evt);
-			} else if (NOTIFY_MOBILE_POSITION.equals(cmd)) {
+			} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
 				logger.info("接收到MobilePosition通知");
 				processNotifyMobilePosition(evt);
 			} else {

+ 122 - 14
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java

@@ -1,8 +1,21 @@
 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.conf.DynamicTask;
+import com.genersoft.iot.vmp.conf.UserSetup;
+import com.genersoft.iot.vmp.gb28181.bean.CmdType;
+import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
+import com.genersoft.iot.vmp.gb28181.task.GPSSubscribeTask;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
+import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
+import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import org.dom4j.DocumentException;
+import org.dom4j.Element;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
@@ -13,7 +26,10 @@ import javax.sip.InvalidArgumentException;
 import javax.sip.RequestEvent;
 import javax.sip.ServerTransaction;
 import javax.sip.SipException;
+import javax.sip.header.CallIdHeader;
 import javax.sip.header.ExpiresHeader;
+import javax.sip.header.Header;
+import javax.sip.header.ToHeader;
 import javax.sip.message.Request;
 import javax.sip.message.Response;
 import java.text.ParseException;
@@ -30,6 +46,21 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
 	@Autowired
 	private SIPProcessorObserver sipProcessorObserver;
 
+	@Autowired
+	private IRedisCatchStorage redisCatchStorage;
+
+	@Autowired
+	private ISIPCommanderForPlatform sipCommanderForPlatform;
+
+	@Autowired
+	private IVideoManagerStorager storager;
+
+	@Autowired
+	private DynamicTask dynamicTask;
+
+	@Autowired
+	private UserSetup userSetup;
+
 	@Override
 	public void afterPropertiesSet() throws Exception {
 		// 添加消息处理的订阅
@@ -46,30 +77,107 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
 		Request request = evt.getRequest();
 
 		try {
-			Response response = null;
-			response = getMessageFactory().createResponse(200, request);
-			if (response != null) {
-				ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30);
-				response.setExpires(expireHeader);
-			}
-			logger.info("response : " + response.toString());
-			ServerTransaction transaction = getServerTransaction(evt);
-			if (transaction != null) {
-				transaction.sendResponse(response);
-				transaction.getDialog().delete();
-				transaction.terminate();
+			Element rootElement = getRootElement(evt);
+			String cmd = XmlUtil.getText(rootElement, "CmdType");
+			if (CmdType.MOBILE_POSITION.equals(cmd)) {
+				logger.info("接收到MobilePosition订阅");
+				processNotifyMobilePosition(evt, rootElement);
+//			} else if (CmdType.ALARM.equals(cmd)) {
+//				logger.info("接收到Alarm订阅");
+//				processNotifyAlarm(evt, rootElement);
+//			} else if (CmdType.CATALOG.equals(cmd)) {
+//				logger.info("接收到Catalog订阅");
+//				processNotifyCatalogList(evt, rootElement);
 			} else {
-				logger.info("processRequest serverTransactionId is null.");
+				logger.info("接收到消息:" + cmd);
+//				responseAck(evt, Response.OK);
+
+				Response response = null;
+				response = getMessageFactory().createResponse(200, request);
+				if (response != null) {
+					ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30);
+					response.setExpires(expireHeader);
+				}
+				logger.info("response : " + response.toString());
+				ServerTransaction transaction = getServerTransaction(evt);
+				if (transaction != null) {
+					transaction.sendResponse(response);
+					transaction.getDialog().delete();
+					transaction.terminate();
+				} else {
+					logger.info("processRequest serverTransactionId is null.");
+				}
 			}
 
+
+
 		} catch (ParseException e) {
 			e.printStackTrace();
 		} catch (SipException e) {
 			e.printStackTrace();
 		} catch (InvalidArgumentException e) {
 			e.printStackTrace();
+		} catch (DocumentException e) {
+			e.printStackTrace();
+		}
+
+	}
+
+	/**
+	 * 处理移动位置订阅消息
+	 */
+	private void processNotifyMobilePosition(RequestEvent evt, Element rootElement) {
+		String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
+		String deviceID = XmlUtil.getText(rootElement, "DeviceID");
+		SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
+		String sn = XmlUtil.getText(rootElement, "SN");
+		String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() +  "_MobilePosition_" + platformId;
+
+		StringBuilder resultXml = new StringBuilder(200);
+		resultXml.append("<?xml version=\"1.0\" ?>\r\n")
+				.append("<Response>\r\n")
+				.append("<CmdType>MobilePosition</CmdType>\r\n")
+				.append("<SN>" + sn + "</SN>\r\n")
+				.append("<DeviceID>" + deviceID + "</DeviceID>\r\n")
+				.append("<Result>OK</Result>\r\n")
+				.append("</Response>\r\n");
+
+		if (subscribeInfo.getExpires() > 0) {
+			if (redisCatchStorage.getSubscribe(key) != null) {
+				dynamicTask.stopCron(key);
+			}
+			String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
+			dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager,  platformId, sn, key), Integer.parseInt(interval));
+
+			redisCatchStorage.updateSubscribe(key, subscribeInfo);
+		}else if (subscribeInfo.getExpires() == 0) {
+			dynamicTask.stopCron(key);
+			redisCatchStorage.delSubscribe(key);
+		}
+
+
+
+		try {
+			Response response = responseXmlAck(evt, resultXml.toString());
+			ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME);
+			subscribeInfo.setToTag(toHeader.getTag());
+			redisCatchStorage.updateSubscribe(key, subscribeInfo);
+
+		} catch (SipException e) {
+			e.printStackTrace();
+		} catch (InvalidArgumentException e) {
+			e.printStackTrace();
+		} catch (ParseException e) {
+			e.printStackTrace();
 		}
-		
+	}
+
+	private void processNotifyAlarm(RequestEvent evt, Element rootElement) {
+
+	}
+
+	private void processNotifyCatalogList(RequestEvent evt, Element rootElement) {
+
 	}
 
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java

@@ -23,7 +23,7 @@ public class GPSMsgInfo {
     private double speed;
 
     /**
-     * 产生通知时间,
+     * 产生通知时间, 时间格式: 2020-01-14T14:32:12
      */
     private String time;
 

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java

@@ -17,6 +17,7 @@ public class RedisGPSMsgListener implements MessageListener {
     @Override
     public void onMessage(Message message, byte[] bytes) {
         GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class);
+        System.out.println(JSON.toJSON(gpsMsgInfo));
         redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
     }
 }

+ 13 - 4
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java

@@ -2,10 +2,7 @@ package com.genersoft.iot.vmp.storager;
 
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.StreamInfo;
-import com.genersoft.iot.vmp.gb28181.bean.Device;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
@@ -196,4 +193,16 @@ public interface IRedisCatchStorage {
     void resetAllCSEQ();
 
     void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo);
+
+    GPSMsgInfo getGpsMsgInfo(String gbId);
+
+    Long getSN(String method);
+
+    void resetAllSN();
+
+    void updateSubscribe(String key, SubscribeInfo subscribeInfo);
+
+    SubscribeInfo getSubscribe(String key);
+
+    void delSubscribe(String key);
 }

+ 7 - 0
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java

@@ -54,6 +54,11 @@ public interface GbStreamMapper {
             "WHERE pgs.platformId = '${platformId}'")
     List<GbStream> queryGbStreamListInPlatform(String platformId);
 
+
+    @Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs  LEFT JOIN platform_gb_stream pgs " +
+            "ON  gs.app = pgs.app and gs.stream = pgs.stream WHERE pgs.app is NULL and pgs.stream is NULL")
+    List<GbStream> queryStreamNotInPlatform();
+
     @Update("UPDATE gb_stream " +
             "SET status=${status} " +
             "WHERE app=#{app} AND stream=#{stream}")
@@ -87,4 +92,6 @@ public interface GbStreamMapper {
             "</foreach> " +
             "</script>")
     void batchAdd(List<StreamPushItem> subList);
+
+
 }

+ 43 - 0
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -49,6 +49,18 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
         return result;
     }
 
+    @Override
+    public Long getSN(String method) {
+        String key = VideoManagerConstants.SIP_SN_PREFIX  + userSetup.getServerId() + "_" +  method;
+
+        long result =  redis.incr(key, 1L);
+        if (result > Integer.MAX_VALUE) {
+            redis.set(key, 1);
+            result = 1;
+        }
+        return result;
+    }
+
     @Override
     public void resetAllCSEQ() {
         String scanKey = VideoManagerConstants.SIP_CSEQ_PREFIX  + userSetup.getServerId() + "_*";
@@ -59,6 +71,16 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
         }
     }
 
+    @Override
+    public void resetAllSN() {
+        String scanKey = VideoManagerConstants.SIP_SN_PREFIX  + userSetup.getServerId() + "_*";
+        List<Object> keys = redis.scan(scanKey);
+        for (int i = 0; i < keys.size(); i++) {
+            String key = (String) keys.get(i);
+            redis.set(key, 1);
+        }
+    }
+
     /**
      * 开始播放时将流存入redis
      *
@@ -433,4 +455,25 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
         String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_" + gpsMsgInfo.getId();
         redis.set(key, gpsMsgInfo);
     }
+
+    @Override
+    public GPSMsgInfo getGpsMsgInfo(String gbId) {
+        String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_" + gbId;
+        return (GPSMsgInfo)redis.get(key);
+    }
+
+    @Override
+    public void updateSubscribe(String key, SubscribeInfo subscribeInfo) {
+        redis.set(key, subscribeInfo, subscribeInfo.getExpires());
+    }
+
+    @Override
+    public SubscribeInfo getSubscribe(String key) {
+        return (SubscribeInfo)redis.get(key);
+    }
+
+    @Override
+    public void delSubscribe(String key) {
+        redis.del(key);
+    }
 }

+ 13 - 10
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java

@@ -486,18 +486,21 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
 		// 更新缓存
 		parentPlatformCatch.setParentPlatform(parentPlatform);
 		redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
-		// 共享所有视频流,需要将现有视频流添加到此平台
-		List<GbStream> gbStreams = gbStreamMapper.selectAll();
-		if (gbStreams.size() > 0) {
-			for (GbStream gbStream : gbStreams) {
-				gbStream.setCatalogId(parentPlatform.getCatalogId());
-			}
-			if (parentPlatform.isShareAllLiveStream()) {
-				gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId());
-			}else {
-				gbStreamService.delPlatformInfo(gbStreams);
+		if (parentPlatform.isEnable()) {
+			// 共享所有视频流,需要将现有视频流添加到此平台
+			List<GbStream> gbStreams = gbStreamMapper.queryStreamNotInPlatform();
+			if (gbStreams.size() > 0) {
+				for (GbStream gbStream : gbStreams) {
+					gbStream.setCatalogId(parentPlatform.getCatalogId());
+				}
+				if (parentPlatform.isShareAllLiveStream()) {
+					gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId());
+				}else {
+					gbStreamService.delPlatformInfo(gbStreams);
+				}
 			}
 		}
+
 		return result > 0;
 	}