Browse Source

优化级联平台GPS订阅

lin 3 years ago
parent
commit
abb60593cb

+ 14 - 3
src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java

@@ -45,10 +45,21 @@ public class GPSSubscribeTask implements Runnable{
                     for (GbStream gbStream : gbStreams) {
                         String gbId = gbStream.getGbId();
                         GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
-                        if (gpsMsgInfo != null && gbStream.isStatus()) {
-                            // 发送GPS消息
-                            sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
+                        if (gbStream.isStatus()) {
+                            if (gpsMsgInfo != null) {
+                                // 发送GPS消息
+                                sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
+                            }else {
+                                // 没有在redis找到新的消息就使用数据库的消息
+                                gpsMsgInfo = new GPSMsgInfo();
+                                gpsMsgInfo.setId(gbId);
+                                gpsMsgInfo.setLat(gbStream.getLongitude());
+                                gpsMsgInfo.setLng(gbStream.getLongitude());
+                                // 发送GPS消息
+                                sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
+                            }
                         }
+
                     }
                 }
             }

+ 39 - 0
src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java

@@ -0,0 +1,39 @@
+package com.genersoft.iot.vmp.service;
+
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+
+/**
+ * 定时查找redis中的GPS推送消息,并保存到对应的流中
+ */
+@Component
+public class StreamGPSSubscribeTask {
+
+    @Autowired
+    private IRedisCatchStorage redisCatchStorage;
+
+    @Autowired
+    private IVideoManagerStorager storager;
+
+
+
+    @Scheduled(fixedRate = 30 * 1000)   //每30秒执行一次
+    public void execute(){
+        List<GPSMsgInfo> gpsMsgInfo = redisCatchStorage.getAllGpsMsgInfo();
+        if (gpsMsgInfo.size() > 0) {
+            storager.updateStreamGPS(gpsMsgInfo);
+            for (GPSMsgInfo msgInfo : gpsMsgInfo) {
+                msgInfo.setStored(true);
+                redisCatchStorage.updateGpsMsgInfo(msgInfo);
+            }
+        }
+
+    }
+}

+ 10 - 0
src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java

@@ -37,6 +37,8 @@ public class GPSMsgInfo {
      */
     private String altitude;
 
+    private boolean stored;
+
 
     public String getId() {
         return id;
@@ -93,4 +95,12 @@ public class GPSMsgInfo {
     public void setAltitude(String altitude) {
         this.altitude = altitude;
     }
+
+    public boolean isStored() {
+        return stored;
+    }
+
+    public void setStored(boolean stored) {
+        this.stored = stored;
+    }
 }

+ 0 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java

@@ -17,7 +17,6 @@ public class RedisGPSMsgListener implements MessageListener {
     @Override
     public void onMessage(Message message, byte[] bytes) {
         GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class);
-        System.out.println(JSON.toJSON(gpsMsgInfo));
         redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
     }
 }

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java

@@ -195,6 +195,7 @@ public interface IRedisCatchStorage {
     void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo);
 
     GPSMsgInfo getGpsMsgInfo(String gbId);
+    List<GPSMsgInfo> getAllGpsMsgInfo();
 
     Long getSN(String method);
 

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java

@@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
 import com.github.pagehelper.PageInfo;
 
@@ -456,4 +457,6 @@ public interface IVideoManagerStorager {
 	List<PlatformCatalog> queryCatalogInPlatform(String serverGBId);
 
     int delRelation(PlatformCatalog platformCatalog);
+
+	int updateStreamGPS(List<GPSMsgInfo> gpsMsgInfo);
 }

+ 10 - 0
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java

@@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.storager.dao;
 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import org.apache.ibatis.annotations.*;
 import org.springframework.stereotype.Repository;
 
@@ -94,4 +95,13 @@ public interface GbStreamMapper {
     void batchAdd(List<StreamPushItem> subList);
 
 
+    @Update({"<script>" +
+            "<foreach collection='gpsMsgInfos' item='item' separator=';'>" +
+            " UPDATE" +
+            " gb_stream" +
+            " SET longitude=${item.lng}, latitude=${item.lat} " +
+            "WHERE gbId=#{item.id}"+
+            "</foreach>" +
+            "</script>"})
+    int updateStreamGPS(List<GPSMsgInfo> gpsMsgInfos);
 }

+ 17 - 1
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -453,7 +453,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
     @Override
     public void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo) {
         String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_" + gpsMsgInfo.getId();
-        redis.set(key, gpsMsgInfo);
+        redis.set(key, gpsMsgInfo, 60); // 默认GPS消息保存1分钟
     }
 
     @Override
@@ -476,4 +476,20 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
     public void delSubscribe(String key) {
         redis.del(key);
     }
+
+    @Override
+    public List<GPSMsgInfo> getAllGpsMsgInfo() {
+        String scanKey = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_*";
+        List<GPSMsgInfo> result = new ArrayList<>();
+        List<Object> keys = redis.scan(scanKey);
+        for (int i = 0; i < keys.size(); i++) {
+            String key = (String) keys.get(i);
+            GPSMsgInfo gpsMsgInfo = (GPSMsgInfo) redis.get(key);
+            if (!gpsMsgInfo.isStored()) { // 只取没有存过得
+                result.add((GPSMsgInfo)redis.get(key));
+            }
+        }
+
+        return result;
+    }
 }

+ 6 - 0
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java

@@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.IGbStreamService;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
 import com.genersoft.iot.vmp.storager.dao.*;
@@ -898,4 +899,9 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
 		}
 		return 0;
 	}
+
+	@Override
+	public int updateStreamGPS(List<GPSMsgInfo> gpsMsgInfos) {
+		return gbStreamMapper.updateStreamGPS(gpsMsgInfos);
+	}
 }