浏览代码

国标级联支持添加通道后主动推动到上级

648540858 2 年之前
父节点
当前提交
b9a33f03d6

+ 4 - 1
sql/2.6.9更新.sql

@@ -1,2 +1,5 @@
 alter table wvp_device_channel
-    change stream_id stream_id varying(255)
+    change stream_id stream_id varying(255)
+
+alter table wvp_platform
+    add auto_push_channel bool default false

+ 1 - 0
sql/初始化.sql

@@ -194,6 +194,7 @@ create table wvp_platform (
                               create_time character varying(50),
                               update_time character varying(50),
                               as_message_channel bool default false,
+                              auto_push_channel bool default false,
                               constraint uk_platform_unique_server_gb_id unique (server_gb_id)
 );
 

+ 11 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java

@@ -186,6 +186,9 @@ public class ParentPlatform {
     @Schema(description = "是否作为消息通道")
     private boolean asMessageChannel;
 
+    @Schema(description = "是否作为消息通道")
+    private boolean autoPushChannel;
+
     public Integer getId() {
         return id;
     }
@@ -425,4 +428,12 @@ public class ParentPlatform {
     public void setAsMessageChannel(boolean asMessageChannel) {
         this.asMessageChannel = asMessageChannel;
     }
+
+    public boolean isAutoPushChannel() {
+        return autoPushChannel;
+    }
+
+    public void setAutoPushChannel(boolean autoPushChannel) {
+        this.autoPushChannel = autoPushChannel;
+    }
 }

+ 14 - 10
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java

@@ -32,11 +32,13 @@ public class SubscribeHolder {
 
     public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) {
         catalogMap.put(platformId, subscribeInfo);
-        // 添加订阅到期
-        String taskOverdueKey = taskOverduePrefix +  "catalog_" + platformId;
-        // 添加任务处理订阅过期
-        dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()),
-                subscribeInfo.getExpires() * 1000);
+        if (subscribeInfo.getExpires() > 0) {
+            // 添加订阅到期
+            String taskOverdueKey = taskOverduePrefix +  "catalog_" + platformId;
+            // 添加任务处理订阅过期
+            dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()),
+                    subscribeInfo.getExpires() * 1000);
+        }
     }
 
     public SubscribeInfo getCatalogSubscribe(String platformId) {
@@ -63,11 +65,13 @@ public class SubscribeHolder {
         dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(platformId),
                 subscribeInfo.getGpsInterval() * 1000);
         String taskOverdueKey = taskOverduePrefix +  "MobilePosition_" + platformId;
-        // 添加任务处理订阅过期
-        dynamicTask.startDelay(taskOverdueKey, () -> {
-                    removeMobilePositionSubscribe(subscribeInfo.getId());
-                },
-                subscribeInfo.getExpires() * 1000);
+        if (subscribeInfo.getExpires() > 0) {
+            // 添加任务处理订阅过期
+            dynamicTask.startDelay(taskOverdueKey, () -> {
+                        removeMobilePositionSubscribe(subscribeInfo.getId());
+                    },
+                    subscribeInfo.getExpires() * 1000);
+        }
     }
 
     public SubscribeInfo getMobilePositionSubscribe(String platformId) {

+ 42 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java

@@ -18,6 +18,9 @@ public class SubscribeInfo {
 
     }
 
+    public SubscribeInfo() {
+    }
+
     private String id;
 
     private SIPRequest request;
@@ -33,6 +36,21 @@ public class SubscribeInfo {
     private String sn;
     private int gpsInterval;
 
+    /**
+     * 模拟的FromTag
+     */
+    private String simulatedFromTag;
+
+    /**
+     * 模拟的ToTag
+     */
+    private String simulatedToTag;
+
+    /**
+     * 模拟的CallID
+     */
+    private String simulatedCallId;
+
     public String getId() {
         return id;
     }
@@ -96,4 +114,28 @@ public class SubscribeInfo {
     public void setGpsInterval(int gpsInterval) {
         this.gpsInterval = gpsInterval;
     }
+
+    public String getSimulatedFromTag() {
+        return simulatedFromTag;
+    }
+
+    public void setSimulatedFromTag(String simulatedFromTag) {
+        this.simulatedFromTag = simulatedFromTag;
+    }
+
+    public String getSimulatedCallId() {
+        return simulatedCallId;
+    }
+
+    public void setSimulatedCallId(String simulatedCallId) {
+        this.simulatedCallId = simulatedCallId;
+    }
+
+    public String getSimulatedToTag() {
+        return simulatedToTag;
+    }
+
+    public void setSimulatedToTag(String simulatedToTag) {
+        this.simulatedToTag = simulatedToTag;
+    }
 }

+ 3 - 3
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java

@@ -227,11 +227,11 @@ public class SIPRequestHeaderPlarformProvider {
 		SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(),
 				parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort());
 		Address fromAddress = SipFactory.getInstance().createAddressFactory().createAddress(fromSipURI);
-		FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, subscribeInfo.getResponse().getToTag());
+		FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, subscribeInfo.getResponse() != null ? subscribeInfo.getResponse().getToTag(): subscribeInfo.getSimulatedToTag());
 		// to
 		SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain());
 		Address toAddress = SipFactory.getInstance().createAddressFactory().createAddress(toSipURI);
-		ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress, subscribeInfo.getRequest().getFromTag());
+		ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress, subscribeInfo.getRequest() != null ?subscribeInfo.getRequest().getFromTag(): subscribeInfo.getSimulatedFromTag());
 
 		// Forwards
 		MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70);
@@ -241,7 +241,7 @@ public class SIPRequestHeaderPlarformProvider {
 		// 设置编码, 防止中文乱码
 		messageFactory.setDefaultContentEncodingCharset("gb2312");
 
-		CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(subscribeInfo.getRequest().getCallIdHeader().getCallId());
+		CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(subscribeInfo.getRequest() != null ? subscribeInfo.getRequest().getCallIdHeader().getCallId(): subscribeInfo.getSimulatedCallId());
 
 		request = (SIPRequest) messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader,
 				toHeader, viaHeaders, maxForwards);

+ 7 - 7
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java

@@ -148,13 +148,13 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
 
         CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport());
 
-            Request request = headerProviderPlatformProvider.createMessageRequest(
-                    parentPlatform,
-                    keepaliveXml.toString(),
-                    SipUtils.getNewFromTag(),
-                    SipUtils.getNewViaTag(),
-                    callIdHeader);
-            sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, errorEvent, okEvent);
+        Request request = headerProviderPlatformProvider.createMessageRequest(
+                parentPlatform,
+                keepaliveXml.toString(),
+                SipUtils.getNewFromTag(),
+                SipUtils.getNewViaTag(),
+                callIdHeader);
+        sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, errorEvent, okEvent);
         return callIdHeader.getCallId();
     }
 

+ 8 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java

@@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
+import com.genersoft.iot.vmp.service.IPlatformService;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import gov.nist.javax.sip.message.SIPRequest;
 import gov.nist.javax.sip.message.SIPResponse;
@@ -50,6 +51,10 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
 	@Autowired
 	private SIPSender sipSender;
 
+
+	@Autowired
+	private IPlatformService platformService;
+
 	@Override
 	public void afterPropertiesSet() throws Exception {
 		// 添加消息处理的订阅
@@ -191,5 +196,8 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
 		} catch (SipException | InvalidArgumentException | ParseException e) {
 			logger.error("未处理的异常 ", e);
 		}
+		if (subscribeHolder.getCatalogSubscribe(platformId) == null && platform.isAutoPushChannel()) {
+			platformService.addSimulatedSubscribeInfo(platform);
+		}
 	}
 }

+ 2 - 0
src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java

@@ -55,4 +55,6 @@ public interface IPlatformService {
      * @param platformId 平台
      */
     void sendNotifyMobilePosition(String platformId);
+
+    void addSimulatedSubscribeInfo(ParentPlatform parentPlatform);
 }

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java

@@ -18,6 +18,7 @@ import org.springframework.jdbc.datasource.DataSourceTransactionManager;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.TransactionDefinition;
 import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.ObjectUtils;
 
 import java.util.ArrayList;

+ 35 - 3
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java

@@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
+import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
@@ -16,17 +17,22 @@ import com.genersoft.iot.vmp.storager.dao.*;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
+import gov.nist.javax.sip.message.SIPRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import javax.sip.InvalidArgumentException;
+import javax.sip.PeerUnavailableException;
 import javax.sip.SipException;
+import javax.sip.SipFactory;
+import javax.sip.address.Address;
+import javax.sip.address.SipURI;
+import javax.sip.header.*;
+import javax.sip.message.Request;
 import java.text.ParseException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * @author lin
@@ -182,6 +188,7 @@ public class PlatformServiceImpl implements IPlatformService {
             }
         }
 
+
         return false;
     }
 
@@ -256,6 +263,31 @@ public class PlatformServiceImpl implements IPlatformService {
                     },
                     (parentPlatform.getKeepTimeout())*1000);
         }
+        if (parentPlatform.isAutoPushChannel()) {
+            if (subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()) == null) {
+                addSimulatedSubscribeInfo(parentPlatform);
+            }
+        }else {
+            SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId());
+            if (catalogSubscribe != null && catalogSubscribe.getExpires() == -1) {
+                subscribeHolder.removeCatalogSubscribe(parentPlatform.getServerGBId());
+            }
+        }
+    }
+
+    @Override
+    public void addSimulatedSubscribeInfo(ParentPlatform parentPlatform) {
+        // 自动添加一条模拟的订阅信息
+        SubscribeInfo subscribeInfo = new SubscribeInfo();
+        subscribeInfo.setId(parentPlatform.getServerGBId());
+        subscribeInfo.setExpires(-1);
+        subscribeInfo.setEventType("Catalog");
+        int random = (int) Math.floor(Math.random() * 10000);
+        subscribeInfo.setEventId(random + "");
+        subscribeInfo.setSimulatedCallId(UUID.randomUUID().toString().replace("-", "") + "@" + parentPlatform.getServerIP());
+        subscribeInfo.setSimulatedFromTag(UUID.randomUUID().toString().replace("-", ""));
+        subscribeInfo.setSimulatedToTag(UUID.randomUUID().toString().replace("-", ""));
+        subscribeHolder.putCatalogSubscribe(parentPlatform.getServerGBId(), subscribeInfo);
     }
 
     private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java

@@ -440,7 +440,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
 
                 }
             }
-            if (streamPushItemListFroPlatform.size() > 0) {
+            if (!streamPushItemListFroPlatform.isEmpty()) {
                 platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform);
                 // 发送通知
                 for (String platformId : platformForEvent.keySet()) {

+ 3 - 2
src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java

@@ -16,10 +16,10 @@ import java.util.List;
 public interface ParentPlatformMapper {
 
     @Insert("INSERT INTO wvp_platform (enable, name, server_gb_id, server_gb_domain, server_ip, server_port,device_gb_id,device_ip,"+
-            "device_port,username,password,expires,keep_timeout,transport,character_set,ptz,rtcp,as_message_channel,"+
+            "device_port,username,password,expires,keep_timeout,transport,character_set,ptz,rtcp,as_message_channel,auto_push_channel,"+
             "status,start_offline_push,catalog_id,administrative_division,catalog_group,create_time,update_time) " +
             "            VALUES (#{enable}, #{name}, #{serverGBId}, #{serverGBDomain}, #{serverIP}, #{serverPort}, #{deviceGBId}, #{deviceIp}, " +
-            "            #{devicePort}, #{username}, #{password}, #{expires}, #{keepTimeout}, #{transport}, #{characterSet}, #{ptz}, #{rtcp}, #{asMessageChannel}, " +
+            "            #{devicePort}, #{username}, #{password}, #{expires}, #{keepTimeout}, #{transport}, #{characterSet}, #{ptz}, #{rtcp}, #{asMessageChannel}, #{autoPushChannel}, " +
             "            #{status},  #{startOfflinePush}, #{catalogId}, #{administrativeDivision}, #{catalogGroup}, #{createTime}, #{updateTime})")
     int addParentPlatform(ParentPlatform parentPlatform);
 
@@ -42,6 +42,7 @@ public interface ParentPlatformMapper {
             "ptz=#{ptz}, " +
             "rtcp=#{rtcp}, " +
             "as_message_channel=#{asMessageChannel}, " +
+            "auto_push_channel=#{autoPushChannel}, " +
             "status=#{status}, " +
             "start_offline_push=#{startOfflinePush}, " +
             "catalog_group=#{catalogGroup}, " +

+ 6 - 2
web_src/src/components/dialog/platformEdit.vue

@@ -91,9 +91,10 @@
               <el-form-item label="其他选项">
                 <el-checkbox label="启用" v-model="platform.enable" @change="checkExpires"></el-checkbox>
 <!--                <el-checkbox label="云台控制" v-model="platform.ptz"></el-checkbox>-->
-                <el-checkbox label="拉起离线推流" v-model="platform.startOfflinePush"></el-checkbox>
+                <el-checkbox label="拉起推流" v-model="platform.startOfflinePush"></el-checkbox>
                 <el-checkbox label="RTCP保活" v-model="platform.rtcp" @change="rtcpCheckBoxChange"></el-checkbox>
-                <el-checkbox label="作为消息通道" v-model="platform.asMessageChannel" ></el-checkbox>
+                <el-checkbox label="消息通道" v-model="platform.asMessageChannel" ></el-checkbox>
+                <el-checkbox label="推送通道" v-model="platform.autoPushChannel" ></el-checkbox>
               </el-form-item>
               <el-form-item>
                 <el-button type="primary" @click="onSubmit">{{
@@ -141,6 +142,7 @@ export default {
         ptz: true,
         rtcp: false,
         asMessageChannel: false,
+        autoPushChannel: false,
         name: null,
         serverGBId: null,
         serverGBDomain: null,
@@ -208,6 +210,7 @@ export default {
         this.platform.ptz = platform.ptz;
         this.platform.rtcp = platform.rtcp;
         this.platform.asMessageChannel = platform.asMessageChannel;
+        this.platform.autoPushChannel = platform.autoPushChannel;
         this.platform.name = platform.name;
         this.platform.serverGBId = platform.serverGBId;
         this.platform.serverGBDomain = platform.serverGBDomain;
@@ -284,6 +287,7 @@ export default {
         ptz: true,
         rtcp: false,
         asMessageChannel: false,
+        autoPushChannel: false,
         name: null,
         serverGBId: null,
         administrativeDivision: null,