Browse Source

优化国标设备点播的信息的Redis缓存策略

648540858 1 year ago
parent
commit
34cc8251e2

+ 2 - 0
src/main/java/com/genersoft/iot/vmp/common/InviteInfo.java

@@ -31,6 +31,8 @@ public class InviteInfo {
 
     private String mediaServerId;
 
+    private Long expirationTime;
+
 
     public static InviteInfo getInviteInfo(String deviceId, Integer channelId, String stream, SSRCInfo ssrcInfo, String mediaServerId,
                                            String receiveIp, Integer receivePort, String streamMode,

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/service/IInviteStreamService.java

@@ -39,7 +39,7 @@ public interface IInviteStreamService {
      */
     void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, Integer channelId);
 
-    List<InviteInfo> getAllInviteInfo(InviteSessionType type, Integer channelId, String stream);
+    List<InviteInfo> getAllInviteInfo();
 
     /**
      * 获取点播的状态信息

+ 78 - 81
src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java

@@ -10,11 +10,12 @@ import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
 import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
 import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
 import com.genersoft.iot.vmp.service.bean.ErrorCallback;
-import com.genersoft.iot.vmp.utils.redis.RedisUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.event.EventListener;
+import org.springframework.data.redis.core.Cursor;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.ScanOptions;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
@@ -23,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
 
 @Slf4j
 @Service
@@ -61,11 +61,12 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
             }
         }
     }
+
     @Override
     public void updateInviteInfo(InviteInfo inviteInfo) {
         if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
             updateInviteInfo(inviteInfo, Long.valueOf(userSetting.getPlayTimeout()) * 2);
-        }else {
+        } else {
             updateInviteInfo(inviteInfo, null);
         }
     }
@@ -114,16 +115,15 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
             inviteInfoForUpdate = inviteInfoInRedis;
 
         }
-        String key = VideoManagerConstants.INVITE_PREFIX +
-                ":" + inviteInfoForUpdate.getType() +
+        String key = VideoManagerConstants.INVITE_PREFIX;
+        String objectKey = inviteInfoForUpdate.getType() +
                 ":" + inviteInfoForUpdate.getChannelId() +
-                ":" + inviteInfoForUpdate.getStream()+
+                ":" + inviteInfoForUpdate.getStream() +
                 ":" + inviteInfoForUpdate.getSsrcInfo().getSsrc();
         if (time != null && time > 0) {
-            redisTemplate.opsForValue().set(key, inviteInfoForUpdate, time, TimeUnit.SECONDS);
-        }else {
-            redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
+            inviteInfoForUpdate.setExpirationTime(time);
         }
+        redisTemplate.opsForHash().put(key, objectKey, inviteInfoForUpdate);
     }
 
     @Override
@@ -134,8 +134,8 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
             return null;
         }
         removeInviteInfo(inviteInfoInDb);
-        String key = VideoManagerConstants.INVITE_PREFIX +
-                ":" + inviteInfo.getType() +
+        String key = VideoManagerConstants.INVITE_PREFIX;
+        String objectKey = inviteInfo.getType() +
                 ":" + inviteInfo.getChannelId() +
                 ":" + stream +
                 ":" + inviteInfo.getSsrcInfo().getSsrc();
@@ -144,46 +144,43 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
             inviteInfoInDb.getSsrcInfo().setStream(stream);
         }
         if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
-            redisTemplate.opsForValue().set(key, inviteInfoInDb, userSetting.getPlayTimeout() * 2, TimeUnit.SECONDS);
-        }else {
-            redisTemplate.opsForValue().set(key, inviteInfoInDb);
+            inviteInfoInDb.setExpirationTime((long) (userSetting.getPlayTimeout() * 2));
         }
-
+        redisTemplate.opsForHash().put(key, objectKey, inviteInfoInDb);
         return inviteInfoInDb;
     }
 
     @Override
     public InviteInfo getInviteInfo(InviteSessionType type, Integer channelId, String stream) {
-        String key = VideoManagerConstants.INVITE_PREFIX +
-                ":" + (type != null ? type : "*") +
+        String key = VideoManagerConstants.INVITE_PREFIX;
+        String keyPattern = (type != null ? type : "*") +
                 ":" + (channelId != null ? channelId : "*") +
                 ":" + (stream != null ? stream : "*")
                 + ":*";
-        List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
-        if (scanResult.isEmpty()) {
-            return null;
-        }
-        if (scanResult.size() != 1) {
-            log.warn("[获取InviteInfo] 发现 key: {}存在多条", key);
-        }
+        ScanOptions options = ScanOptions.scanOptions().match(keyPattern).count(20).build();
+        try (Cursor<Map.Entry<Object, Object>> cursor = redisTemplate.opsForHash().scan(key, options)) {
+            if (cursor.hasNext()) {
+                InviteInfo inviteInfo = (InviteInfo) cursor.next().getValue();
+                cursor.close();
+                return inviteInfo;
 
-        return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
+            }
+        } catch (Exception e) {
+            log.error("[Redis-InviteInfo] 查询异常: ", e);
+        }
+        return null;
     }
 
     @Override
-    public List<InviteInfo> getAllInviteInfo(InviteSessionType type, Integer channelId, String stream) {
-        String key = VideoManagerConstants.INVITE_PREFIX +
-                ":" + (type != null ? type : "*") +
-                ":" + (channelId != null ? channelId : "*") +
-                ":" + (stream != null ? stream : "*")
-                + ":*";
-        List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
-        if (scanResult.isEmpty()) {
-            return new ArrayList<>();
-        }
+    public List<InviteInfo> getAllInviteInfo() {
         List<InviteInfo> result = new ArrayList<>();
-        for (Object keyObj : scanResult) {
-            result.add((InviteInfo) redisTemplate.opsForValue().get(keyObj));
+        String key = VideoManagerConstants.INVITE_PREFIX;
+        List<Object> values = redisTemplate.opsForHash().values(key);
+        if(values.isEmpty()) {
+            return result;
+        }
+        for (Object value : values) {
+            result.add((InviteInfo)value);
         }
         return result;
     }
@@ -199,23 +196,22 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
     }
 
     @Override
-    public void removeInviteInfo(InviteSessionType type,  Integer channelId, String stream) {
-        String scanKey = VideoManagerConstants.INVITE_PREFIX +
-                ":" + (type != null ? type : "*") +
-                ":" + (channelId != null ? channelId : "*") +
-                ":" + (stream != null ? stream : "*") +
-                ":*";
-        List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
-        if (!scanResult.isEmpty()) {
-            for (Object keyObj : scanResult) {
-                String key = (String) keyObj;
-                InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key);
-                if (inviteInfo == null) {
-                    continue;
-                }
-                redisTemplate.delete(key);
-                inviteErrorCallbackMap.remove(buildKey(type,channelId, inviteInfo.getStream()));
-            }
+    public void removeInviteInfo(InviteSessionType type, Integer channelId, String stream) {
+        String key = VideoManagerConstants.INVITE_PREFIX;
+        if (type == null && channelId == null && stream == null) {
+            redisTemplate.opsForHash().delete(key);
+            return;
+        }
+        InviteInfo inviteInfo = getInviteInfo(type, channelId, stream);
+        if (inviteInfo != null) {
+            String objectKey = inviteInfo.getType() +
+                    ":" + inviteInfo.getChannelId() +
+                    ":" + stream +
+                    ":" + inviteInfo.getSsrcInfo().getSsrc();
+            redisTemplate.opsForHash().delete(key, objectKey);
+        }
+        if (redisTemplate.opsForHash().size(key) == 0) {
+            redisTemplate.opsForHash().delete(key);
         }
     }
 
@@ -230,14 +226,14 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
     }
 
     @Override
-    public void once(InviteSessionType type,  Integer channelId, String stream, ErrorCallback<StreamInfo> callback) {
+    public void once(InviteSessionType type, Integer channelId, String stream, ErrorCallback<StreamInfo> callback) {
         String key = buildKey(type, channelId, stream);
         List<ErrorCallback<StreamInfo>> callbacks = inviteErrorCallbackMap.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>());
         callbacks.add(callback);
 
     }
 
-    private String buildKey(InviteSessionType type,  Integer channelId, String stream) {
+    private String buildKey(InviteSessionType type, Integer channelId, String stream) {
         String key = type + ":" + channelId;
         // 如果ssrc未null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
         if (stream != null) {
@@ -249,7 +245,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
 
     @Override
     public void clearInviteInfo(String deviceId) {
-        List<InviteInfo> inviteInfoList = getAllInviteInfo(null, null, null);
+        List<InviteInfo> inviteInfoList = getAllInviteInfo();
         for (InviteInfo inviteInfo : inviteInfoList) {
             if (inviteInfo.getDeviceId().equals(deviceId)) {
                 removeInviteInfo(inviteInfo);
@@ -260,23 +256,21 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
     @Override
     public int getStreamInfoCount(String mediaServerId) {
         int count = 0;
-        String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*";
-        List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
-        if (scanResult.isEmpty()) {
-            return 0;
-        }else {
-            for (Object keyObj : scanResult) {
-                String keyStr = (String) keyObj;
-                InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(keyStr);
-                if (inviteInfo != null
-                        && inviteInfo.getStreamInfo() != null
-                        && inviteInfo.getStreamInfo().getMediaServer() != null
-                        && inviteInfo.getStreamInfo().getMediaServer().getId().equals(mediaServerId)) {
-                    if (inviteInfo.getType().equals(InviteSessionType.DOWNLOAD) && inviteInfo.getStreamInfo().getProgress() == 1) {
-                        continue;
-                    }
-                    count++;
+        String key = VideoManagerConstants.INVITE_PREFIX;
+        List<Object> values = redisTemplate.opsForHash().values(key);
+        if (values.isEmpty()) {
+            return count;
+        }
+        for (Object value : values) {
+            InviteInfo inviteInfo = (InviteInfo)value;
+            if (inviteInfo != null
+                    && inviteInfo.getStreamInfo() != null
+                    && inviteInfo.getStreamInfo().getMediaServer() != null
+                    && inviteInfo.getStreamInfo().getMediaServer().getId().equals(mediaServerId)) {
+                if (inviteInfo.getType().equals(InviteSessionType.DOWNLOAD) && inviteInfo.getStreamInfo().getProgress() == 1) {
+                    continue;
                 }
+                count++;
             }
         }
         return count;
@@ -309,13 +303,16 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
 
     @Override
     public InviteInfo getInviteInfoBySSRC(String ssrc) {
-        String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:" + ssrc;
-        List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
-        if (scanResult.size() != 1) {
+        List<InviteInfo> inviteInfoList = getAllInviteInfo();
+        if (inviteInfoList.isEmpty()) {
             return null;
         }
-
-        return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
+        for (InviteInfo inviteInfo : inviteInfoList) {
+            if (inviteInfo.getSsrcInfo() != null && ssrc.equals(inviteInfo.getSsrcInfo().getSsrc())) {
+                return inviteInfo;
+            }
+        }
+        return null;
     }
 
     @Override
@@ -325,15 +322,15 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
             return null;
         }
         removeInviteInfo(inviteInfoInDb);
-        String key = VideoManagerConstants.INVITE_PREFIX +
-                ":" + inviteInfo.getType() +
+        String key = VideoManagerConstants.INVITE_PREFIX;
+        String objectKey = inviteInfo.getType() +
                 ":" + inviteInfo.getChannelId() +
                 ":" + inviteInfo.getStream() +
                 ":" + ssrc;
         if (inviteInfoInDb.getSsrcInfo() != null) {
             inviteInfoInDb.getSsrcInfo().setSsrc(ssrc);
         }
-        redisTemplate.opsForValue().set(key, inviteInfoInDb);
+        redisTemplate.opsForHash().put(key, objectKey, inviteInfoInDb);
         return inviteInfoInDb;
     }
 }

+ 3 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java

@@ -1566,10 +1566,11 @@ public class PlayServiceImpl implements IPlayService {
 
     @Override
     public void stop(InviteSessionType type, Device device, DeviceChannel channel, String stream) {
-        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(type, stream);
+        InviteInfo inviteInfo = inviteStreamService.getInviteInfo(type, channel.getId(), stream);
         if (inviteInfo == null) {
             throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到");
         }
+        inviteStreamService.removeInviteInfoByDeviceAndChannel(inviteInfo.getType(), channel.getId());
         if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
             try {
                 log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId());
@@ -1579,7 +1580,7 @@ public class PlayServiceImpl implements IPlayService {
                 throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
             }
         }
-        inviteStreamService.removeInviteInfoByDeviceAndChannel(inviteInfo.getType(), channel.getId());
+
         if (inviteInfo.getType() == InviteSessionType.PLAY) {
             deviceChannelService.stopPlay(channel.getId());
         }

+ 38 - 4
src/main/java/com/genersoft/iot/vmp/vmanager/TestController.java

@@ -1,16 +1,21 @@
 package com.genersoft.iot.vmp.vmanager;
 
-import com.genersoft.iot.vmp.conf.security.JwtUtils;
+import com.genersoft.iot.vmp.common.InviteInfo;
+import com.genersoft.iot.vmp.common.InviteSessionType;
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
 import com.genersoft.iot.vmp.media.event.hook.Hook;
 import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
-import io.swagger.v3.oas.annotations.Operation;
-import io.swagger.v3.oas.annotations.security.SecurityRequirement;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.Cursor;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.ScanOptions;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 @RestController
 @RequestMapping("/api/test")
@@ -19,13 +24,42 @@ public class TestController {
     @Autowired
     private HookSubscribe subscribe;
 
+    @Autowired
+    private RedisTemplate<Object, Object> redisTemplate;
 
     @GetMapping("/hook/list")
-    @Operation(summary = "查询角色", security = @SecurityRequirement(name = JwtUtils.HEADER))
     public List<Hook> all(){
         return subscribe.getAll();
     }
 
+
+    @GetMapping("/redis")
+    public List<InviteInfo> redis(){
+        InviteSessionType type = InviteSessionType.PLAY;
+        String channelId = null;
+        String stream = null;
+
+        String key = VideoManagerConstants.INVITE_PREFIX;
+        String keyPattern = (type != null ? type : "*") +
+                ":" + (channelId != null ? channelId : "*") +
+                ":" + (stream != null ? stream : "*")
+                + ":*";
+        ScanOptions options = ScanOptions.scanOptions().match(keyPattern).count(20).build();
+        Cursor<Map.Entry<Object, Object>> cursor = redisTemplate.opsForHash().scan(key, options);
+        List<InviteInfo> result = new ArrayList<>();
+        try {
+            while (cursor.hasNext()) {
+                System.out.println(cursor.next().getKey());
+                result.add((InviteInfo) cursor.next().getValue());
+            }
+        }catch (Exception e) {
+
+        }finally {
+            cursor.close();
+        }
+        return result;
+    }
+
 //    @Bean
 //    public ServletRegistrationBean<StatViewServlet> druidStatViewServlet() {
 //        ServletRegistrationBean<StatViewServlet> registrationBean = new ServletRegistrationBean<>(new StatViewServlet(),  "/druid/*");