Bläddra i källkod

修复推流设备位置信息入库以及页面添加位置信息展示

648540858 1 år sedan
förälder
incheckning
a316a12187

+ 9 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java

@@ -440,4 +440,13 @@ public interface CommonGBChannelMapper {
     @SelectProvider(type = ChannelProvider.class, method = "queryListByStreamPushList")
     List<CommonGBChannel> queryListByStreamPushList(List<StreamPush> streamPushList);
 
+    @Update(value = {" <script>" +
+            " <foreach collection='channels' item='item' separator=';' >" +
+            " UPDATE wvp_device_channel " +
+            " SET gb_longitude=#{item.gbLongitude}, gb_latitude=#{item.gbLatitude} " +
+            " WHERE stream_push_id IS NOT NULL AND gb_device_id=#{item.gbDeviceId} "+
+             "</foreach>"+
+            " </script>"})
+    void updateGpsByDeviceIdForStreamPush(List<CommonGBChannel> channels);
+
 }

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java

@@ -81,4 +81,5 @@ public interface IGbChannelService {
 
     List<CommonGBChannel> queryListByStreamPushList(List<StreamPush> streamPushList);
 
+    void updateGpsByDeviceIdForStreamPush(List<CommonGBChannel> channels);
 }

+ 5 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java

@@ -693,4 +693,9 @@ public class GbChannelServiceImpl implements IGbChannelService {
     public List<CommonGBChannel> queryListByStreamPushList(List<StreamPush> streamPushList) {
         return commonGBChannelMapper.queryListByStreamPushList(streamPushList);
     }
+
+    @Override
+    public void updateGpsByDeviceIdForStreamPush(List<CommonGBChannel> channels) {
+        commonGBChannelMapper.updateGpsByDeviceIdForStreamPush(channels);
+    }
 }

+ 36 - 27
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java

@@ -1,24 +1,24 @@
 package com.genersoft.iot.vmp.service.redisMsg;
 
 import com.alibaba.fastjson2.JSON;
-import com.genersoft.iot.vmp.service.IMobilePositionService;
 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
 import lombok.extern.slf4j.Slf4j;
 import org.jetbrains.annotations.NotNull;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.data.redis.connection.Message;
 import org.springframework.data.redis.connection.MessageListener;
 import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
- * 接收来自redis的GPS更新通知
+ * 接收来自redis的GPS更新通知, 此处只针对推流设备
+ *
  * @author lin
  * 监听:  SUBSCRIBE VM_MSG_GPS
  * 发布   PUBLISH VM_MSG_GPS '{"messageId":"1727228507555","id":"24212345671381000047","lng":116.30307666666667,"lat":40.03295833333333,"time":"2024-09-25T09:41:47","direction":"56.0","speed":0.0,"altitude":60.0,"unitNo":"100000000","memberNo":"10000047"}'
@@ -31,34 +31,42 @@ public class RedisGpsMsgListener implements MessageListener {
     private IRedisCatchStorage redisCatchStorage;
 
     @Autowired
-    private IMobilePositionService mobilePositionService;
+    private IStreamPushService streamPushService;
 
     private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
 
-    @Qualifier("taskExecutor")
-    @Autowired
-    private ThreadPoolTaskExecutor taskExecutor;
-
 
     @Override
     public void onMessage(@NotNull Message message, byte[] bytes) {
-        boolean isEmpty = taskQueue.isEmpty();
         taskQueue.offer(message);
-        if (isEmpty) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    Message msg = taskQueue.poll();
-                    try {
-                        GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class);
-                        log.info("[REDIS的位置变化通知], {}", JSON.toJSONString(gpsMsgInfo));
-                        // 只是放入redis缓存起来
-                        redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
-                    }catch (Exception e) {
-                        log.warn("[REDIS的位置变化通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
-                        log.error("[REDIS的位置变化通知] 异常内容: ", e);
-                    }
-                }
-            });
+    }
+
+    @Scheduled(fixedDelay = 200)   //每400毫秒执行一次
+    public void executeTaskQueue() {
+        if (taskQueue.isEmpty()) {
+            return;
+        }
+        List<Message> messageDataList = new ArrayList<>();
+        int size = taskQueue.size();
+        for (int i = 0; i < size; i++) {
+            Message msg = taskQueue.poll();
+            if (msg != null) {
+                messageDataList.add(msg);
+            }
+        }
+        if (messageDataList.isEmpty()) {
+            return;
+        }
+        for (Message msg : messageDataList) {
+            try {
+                GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class);
+                log.info("[REDIS的位置变化通知], {}", JSON.toJSONString(gpsMsgInfo));
+                // 只是放入redis缓存起来
+                redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
+            } catch (Exception e) {
+                log.warn("[REDIS的位置变化通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(msg));
+                log.error("[REDIS的位置变化通知] 异常内容: ", e);
+            }
         }
     }
 
@@ -66,10 +74,11 @@ public class RedisGpsMsgListener implements MessageListener {
      * 定时将经纬度更新到数据库
      */
     @Scheduled(fixedDelay = 2 * 1000)   //每2秒执行一次
-    public void execute(){
+    public void execute() {
+        // 需要查询到
         List<GPSMsgInfo> gpsMsgInfoList = redisCatchStorage.getAllGpsMsgInfo();
         if (!gpsMsgInfoList.isEmpty()) {
-            mobilePositionService.updateStreamGPS(gpsMsgInfoList);
+            streamPushService.updateGPSFromGPSMsgInfo(gpsMsgInfoList);
             for (GPSMsgInfo msgInfo : gpsMsgInfoList) {
                 msgInfo.setStored(true);
                 redisCatchStorage.updateGpsMsgInfo(msgInfo);

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java

@@ -1,5 +1,6 @@
 package com.genersoft.iot.vmp.streamPush.service;
 
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
 import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
 import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
@@ -97,4 +98,6 @@ public interface IStreamPushService {
     int delete(int id);
 
     void batchRemove(Set<Integer> ids);
+
+    void updateGPSFromGPSMsgInfo(List<GPSMsgInfo> gpsMsgInfoList);
 }

+ 14 - 0
src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java

@@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
 import com.genersoft.iot.vmp.service.ISendRtpServerService;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
@@ -583,4 +584,17 @@ public class StreamPushServiceImpl implements IStreamPushService {
         streamPushMapper.batchDel(streamPushList);
         gbChannelService.delete(ids);
     }
+
+    @Override
+    public void updateGPSFromGPSMsgInfo(List<GPSMsgInfo> gpsMsgInfoList) {
+        List<CommonGBChannel> channels = new ArrayList<>();
+        for (GPSMsgInfo gpsMsgInfo : gpsMsgInfoList) {
+            CommonGBChannel channel = new CommonGBChannel();
+            channel.setGbDeviceId(gpsMsgInfo.getId());
+            channel.setGbLongitude(gpsMsgInfo.getLng());
+            channel.setGbLatitude(gpsMsgInfo.getLat());
+            channels.add(channel);
+        }
+        gbChannelService.updateGpsByDeviceIdForStreamPush(channels);
+    }
 }

+ 13 - 1
web_src/src/components/StreamPushList.vue

@@ -46,7 +46,7 @@
         </el-table-column>
         <el-table-column prop="gbName" label="名称" min-width="200">
         </el-table-column>
-        <el-table-column prop="app" label="应用名" min-width="200">
+        <el-table-column prop="app" label="应用名" min-width="100">
         </el-table-column>
         <el-table-column prop="stream" label="流ID" min-width="200">
         </el-table-column>
@@ -58,6 +58,12 @@
         </el-table-column>
         <el-table-column prop="gbDeviceId" label="国标编码" min-width="200" >
         </el-table-column>
+        <el-table-column label="位置信息" min-width="200">
+          <template slot-scope="scope">
+            <span size="medium" v-if="scope.row.gbLongitude && scope.row.gbLatitude">{{scope.row.gbLongitude}}<br/>{{scope.row.gbLatitude}}</span>
+            <span size="medium" v-if="!scope.row.gbLongitude || !scope.row.gbLatitude">无</span>
+          </template>
+        </el-table-column>
         <el-table-column prop="mediaServerId" label="流媒体" min-width="200" >
         </el-table-column>
         <el-table-column label="开始时间"  min-width="200">
@@ -181,6 +187,12 @@ export default {
           if (res.data.code === 0) {
             that.total = res.data.data.total;
             that.pushList = res.data.data.list;
+            that.pushList.forEach(e => {
+              that.$set(e, "location", "");
+              if (e.gbLongitude && e.gbLatitude) {
+                that.$set(e, "location", e.gbLongitude + "," + e.gbLatitude);
+              }
+            });
           }
 
       }).catch(function (error) {

+ 5 - 9
web_src/src/components/channelList.vue

@@ -72,7 +72,11 @@
           </el-table-column>
           <el-table-column prop="manufacturer" label="厂家" min-width="100">
           </el-table-column>
-          <el-table-column prop="location" label="位置信息" min-width="120">
+          <el-table-column label="位置信息" min-width="120">
+            <template slot-scope="scope">
+              <span size="medium" v-if="scope.row.longitude && scope.row.latitude">{{scope.row.longitude}}<br/>{{scope.row.latitude}}</span>
+              <span size="medium" v-if="!scope.row.longitude || !scope.row.latitude">无</span>
+            </template>
           </el-table-column>
           <el-table-column prop="ptzType" label="云台类型" min-width="100">
             <template v-slot:default="scope">
@@ -275,10 +279,6 @@ export default {
           that.deviceChannelList = res.data.data.list;
           that.deviceChannelList.forEach(e => {
             e.ptzType = e.ptzType + "";
-            that.$set(e, "location", "");
-            if (e.longitude && e.latitude) {
-              that.$set(e, "location", e.longitude + "," + e.latitude);
-            }
           });
           // 防止出现表格错位
           that.$nextTick(() => {
@@ -429,10 +429,6 @@ export default {
             this.deviceChannelList = res.data.data.list;
             this.deviceChannelList.forEach(e => {
               e.ptzType = e.ptzType + "";
-              this.$set(e, "location", "");
-              if (e.longitude && e.latitude) {
-                this.$set(e, "location", e.longitude + "," + e.latitude);
-              }
             });
             // 防止出现表格错位
             this.$nextTick(() => {