Преглед на файлове

支持多WVP的推流播放

648540858 преди 1 година
родител
ревизия
16708e385b
променени са 22 файла, в които са добавени 203 реда и са изтрити 129 реда
  1. 42 0
      src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java
  2. 2 0
      src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java
  3. 37 1
      src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java
  4. 1 1
      src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java
  5. 5 7
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
  6. 21 2
      src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java
  7. 9 46
      src/main/java/com/genersoft/iot/vmp/media/event/hook/Hook.java
  8. 3 1
      src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java
  9. 2 8
      src/main/java/com/genersoft/iot/vmp/media/event/media/MediaArrivalEvent.java
  10. 1 1
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
  11. 6 2
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java
  12. 1 1
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
  13. 2 2
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
  14. 2 4
      src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
  15. 6 8
      src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
  16. 22 4
      src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java
  17. 1 1
      src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java
  18. 5 5
      src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java
  19. 31 31
      src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java
  20. 1 1
      web_src/src/components/StreamPushEdit.vue
  21. 2 2
      web_src/src/components/StreamPushList.vue
  22. 1 1
      数据库/2.7.2-重构/初始化-mysql-2.7.2.sql

+ 42 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java

@@ -414,4 +414,46 @@ public interface CommonGBChannelMapper {
             " </script>"})
     int updateGroup(@Param("parentId") String parentId, @Param("businessGroup") String businessGroup,
                     List<CommonGBChannel> channelList);
+
+    @Update({"<script>" +
+            "<foreach collection='commonGBChannels' item='item' separator=';'>" +
+            " UPDATE" +
+            " wvp_device_channel" +
+            " SET update_time=#{item.updateTime}" +
+            ", gb_device_id=#{item.gbDeviceId}" +
+            ", gb_name=#{item.gbName}" +
+            ", gb_manufacturer=#{item.gbManufacturer}" +
+            ", gb_model=#{item.gbModel}" +
+            ", gb_owner=#{item.gbOwner}" +
+            ", gb_civil_code=#{item.gbCivilCode}" +
+            ", gb_block=#{item.gbBlock}" +
+            ", gb_address=#{item.gbAddress}" +
+            ", gb_parental=#{item.gbParental}" +
+            ", gb_safety_way=#{item.gbSafetyWay}" +
+            ", gb_register_way=#{item.gbRegisterWay}" +
+            ", gb_cert_num=#{item.gbCertNum}" +
+            ", gb_certifiable=#{item.gbCertifiable}" +
+            ", gb_err_code=#{item.gbErrCode}" +
+            ", gb_end_time=#{item.gbEndTime}" +
+            ", gb_ip_address=#{item.gbIpAddress}" +
+            ", gb_port=#{item.gbPort}" +
+            ", gb_password=#{item.gbPassword}" +
+            ", gb_status=#{item.gbStatus}" +
+            ", gb_longitude=#{item.gbLongitude}" +
+            ", gb_latitude=#{item.gbLatitude}" +
+            ", gb_ptz_type=#{item.gbPtzType}" +
+            ", gb_position_type=#{item.gbPositionType}" +
+            ", gb_room_type=#{item.gbRoomType}" +
+            ", gb_use_type=#{item.gbUseType}" +
+            ", gb_supply_light_type=#{item.gbSupplyLightType}" +
+            ", gb_direction_type=#{item.gbDirectionType}" +
+            ", gb_resolution=#{item.gbResolution}" +
+            ", gb_business_group_id=#{item.gbBusinessGroupId}" +
+            ", gb_download_speed=#{item.gbDownloadSpeed}" +
+            ", gb_svc_space_support_mod=#{item.gbSvcSpaceSupportMod}" +
+            ", gb_svc_time_support_mode=#{item.gbSvcTimeSupportMode}" +
+            " WHERE id=#{item.id}" +
+            "</foreach>" +
+            "</script>"})
+    int batchUpdate(List<CommonGBChannel> commonGBChannels);
 }

+ 2 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java

@@ -73,4 +73,6 @@ public interface IGbChannelService {
     void addChannelToGroupByGbDevice(String parentId, String businessGroup, List<Integer> deviceIds);
 
     void deleteChannelToGroupByGbDevice(List<Integer> deviceIds);
+
+    void batchUpdate(List<CommonGBChannel> commonGBChannels);
 }

+ 37 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java

@@ -238,6 +238,36 @@ public class GbChannelServiceImpl implements IGbChannelService {
         log.warn("[新增多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result);
     }
 
+    @Override
+    public void batchUpdate(List<CommonGBChannel> commonGBChannels) {
+        if (commonGBChannels.isEmpty()) {
+            log.warn("[更新多个通道] 通道数量为0,更新失败");
+            return;
+        }
+        // 批量保存
+        int limitCount = 1000;
+        int result = 0;
+        if (commonGBChannels.size() > limitCount) {
+            for (int i = 0; i < commonGBChannels.size(); i += limitCount) {
+                int toIndex = i + limitCount;
+                if (i + limitCount > commonGBChannels.size()) {
+                    toIndex = commonGBChannels.size();
+                }
+                result += commonGBChannelMapper.batchUpdate(commonGBChannels.subList(i, toIndex));
+            }
+        }else {
+            result += commonGBChannelMapper.batchUpdate(commonGBChannels);
+        }
+        log.warn("[更新多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result);
+        // 发送通过更新通知
+        try {
+            // 发送通知
+            eventPublisher.catalogEventPublish(null, commonGBChannels, CatalogEvent.UPDATE);
+        }catch (Exception e) {
+            log.warn("[更新多个通道] 发送失败,{}个", commonGBChannels.size(), e);
+        }
+    }
+
     @Override
     @Transactional
     public void updateStatus(List<CommonGBChannel> commonGBChannels) {
@@ -259,11 +289,17 @@ public class GbChannelServiceImpl implements IGbChannelService {
             result += commonGBChannelMapper.updateStatus(commonGBChannels);
         }
         log.warn("[更新多个通道状态] 通道数量为{},成功保存:{}", commonGBChannels.size(), result);
+        // 发送通过更新通知
+        try {
+            // 发送通知
+            eventPublisher.catalogEventPublish(null, commonGBChannels, CatalogEvent.UPDATE);
+        }catch (Exception e) {
+            log.warn("[更新多个通道] 发送失败,{}个", commonGBChannels.size(), e);
+        }
     }
 
     @Override
     public List<CommonGBChannel> queryByPlatformId(Integer platformId) {
-
         return commonGBChannelMapper.queryByPlatformId(platformId);
     }
 

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java

@@ -523,7 +523,7 @@ public class PlayServiceImpl implements IPlayService {
                     streamSession.remove(device.getDeviceId(), channel.getDeviceId(), ssrcInfo.getStream());
                     mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                     // 取消订阅消息监听
-                    subscribe.removeSubscribe(Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream(), mediaServerItem.getId()));
+                    subscribe.removeSubscribe(Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream()));
                 }
             }else {
                 log.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}",

+ 5 - 7
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.gb28181.service.IPlayService;
 import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
@@ -24,11 +25,8 @@ import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.event.hook.Hook;
 import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
 import com.genersoft.iot.vmp.media.event.hook.HookType;
-import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
-import com.genersoft.iot.vmp.gb28181.service.IPlayService;
 import com.genersoft.iot.vmp.service.bean.ErrorCallback;
 import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@@ -598,10 +596,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                         sendRtpItem.setPlayType(InviteStreamType.PUSH);
                         if (streamPushItem != null) {
                             // 从redis查询是否正在接收这个推流
-                            MediaArrivalEvent mediaArrivalEvent = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
-                            if (mediaArrivalEvent != null) {
-                                sendRtpItem.setServerId(mediaArrivalEvent.getServerId());
-                                sendRtpItem.setMediaServerId(mediaArrivalEvent.getMediaServer().getId());
+                            MediaInfo mediaInfo = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
+                            if (mediaInfo != null) {
+                                sendRtpItem.setServerId(mediaInfo.getServerId());
+                                sendRtpItem.setMediaServerId(mediaInfo.getMediaServer().getId());
 
                                 redisCatchStorage.updateSendRTPSever(sendRtpItem);
                                 // 开始推流

+ 21 - 2
src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java

@@ -3,10 +3,12 @@ package com.genersoft.iot.vmp.media.bean;
 import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
+import com.genersoft.iot.vmp.utils.MediaServerUtils;
 import io.swagger.v3.oas.annotations.media.Schema;
 import lombok.Data;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * 视频信息
@@ -51,10 +53,15 @@ public class MediaInfo {
     private Long bytesSpeed;
     @Schema(description = "鉴权参数")
     private String callId;
+    @Schema(description = "额外参数")
+    private Map<String, String> paramMap;
+    @Schema(description = "服务ID")
+    private String serverId;
 
-    public static MediaInfo getInstance(JSONObject jsonObject, MediaServer mediaServer) {
+    public static MediaInfo getInstance(JSONObject jsonObject, MediaServer mediaServer, String serverId) {
         MediaInfo mediaInfo = new MediaInfo();
         mediaInfo.setMediaServer(mediaServer);
+        mediaInfo.setServerId(serverId);
         String app = jsonObject.getString("app");
         mediaInfo.setApp(app);
         String stream = jsonObject.getString("stream");
@@ -66,6 +73,7 @@ public class MediaInfo {
         Integer originType = jsonObject.getInteger("originType");
         String originUrl = jsonObject.getString("originUrl");
         Long aliveSecond = jsonObject.getLong("aliveSecond");
+        String params = jsonObject.getString("params");
         Long bytesSpeed = jsonObject.getLong("bytesSpeed");
         if (totalReaderCount != null) {
             mediaInfo.setReaderCount(totalReaderCount);
@@ -86,6 +94,12 @@ public class MediaInfo {
         if (bytesSpeed != null) {
             mediaInfo.setBytesSpeed(bytesSpeed);
         }
+        if (params != null) {
+            mediaInfo.setParamMap(MediaServerUtils.urlParamToMap(params));
+            if(mediaInfo.getCallId() == null) {
+                mediaInfo.setCallId(mediaInfo.getParamMap().get("callId"));
+            }
+        }
         JSONArray jsonArray = jsonObject.getJSONArray("tracks");
         if (jsonArray.isEmpty()) {
             return null;
@@ -137,7 +151,7 @@ public class MediaInfo {
         return mediaInfo;
     }
 
-    public static MediaInfo getInstance(OnStreamChangedHookParam param, MediaServer mediaServer) {
+    public static MediaInfo getInstance(OnStreamChangedHookParam param, MediaServer mediaServer, String serverId) {
 
         MediaInfo mediaInfo = new MediaInfo();
         mediaInfo.setApp(param.getApp());
@@ -150,6 +164,11 @@ public class MediaInfo {
         mediaInfo.setOriginUrl(param.getOriginUrl());
         mediaInfo.setAliveSecond(param.getAliveSecond());
         mediaInfo.setBytesSpeed(param.getBytesSpeed());
+        mediaInfo.setParamMap(param.getParamMap());
+        if(mediaInfo.getCallId() == null) {
+            mediaInfo.setCallId(param.getParamMap().get("callId"));
+        }
+        mediaInfo.setServerId(serverId);
         List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks();
         if (tracks == null || tracks.isEmpty()) {
             return mediaInfo;

+ 9 - 46
src/main/java/com/genersoft/iot/vmp/media/event/hook/Hook.java

@@ -1,9 +1,12 @@
 package com.genersoft.iot.vmp.media.event.hook;
 
+import lombok.Data;
+
 /**
  * zlm hook事件的参数
  * @author lin
  */
+@Data
 public class Hook {
 
     private HookType hookType;
@@ -12,60 +15,21 @@ public class Hook {
 
     private String stream;
 
-    private String mediaServerId;
-
     private Long expireTime;
 
 
-    public static Hook getInstance(HookType hookType, String app, String stream, String mediaServerId) {
+    public static Hook getInstance(HookType hookType, String app, String stream) {
         Hook hookSubscribe = new Hook();
         hookSubscribe.setApp(app);
         hookSubscribe.setStream(stream);
         hookSubscribe.setHookType(hookType);
-        hookSubscribe.setMediaServerId(mediaServerId);
         hookSubscribe.setExpireTime(System.currentTimeMillis() + 5 * 60 * 1000);
         return hookSubscribe;
     }
 
-    public HookType getHookType() {
-        return hookType;
-    }
-
-    public void setHookType(HookType hookType) {
-        this.hookType = hookType;
-    }
-
-    public String getApp() {
-        return app;
-    }
-
-    public void setApp(String app) {
-        this.app = app;
-    }
-
-    public String getStream() {
-        return stream;
-    }
-
-    public void setStream(String stream) {
-        this.stream = stream;
-    }
-
-
-    public Long getExpireTime() {
-        return expireTime;
-    }
-
-    public void setExpireTime(Long expireTime) {
-        this.expireTime = expireTime;
-    }
-
-    public String getMediaServerId() {
-        return mediaServerId;
-    }
-
-    public void setMediaServerId(String mediaServerId) {
-        this.mediaServerId = mediaServerId;
+    public static Hook getInstance(HookType hookType, String app, String stream, String mediaServer) {
+        // TODO 后续修改所有方法
+        return Hook.getInstance(hookType, app, stream);
     }
 
     @Override
@@ -74,8 +38,7 @@ public class Hook {
             Hook param = (Hook) obj;
             return param.getHookType().equals(this.hookType)
                     && param.getApp().equals(this.app)
-                    && param.getStream().equals(this.stream)
-                    && param.getMediaServerId().equals(this.mediaServerId);
+                    && param.getStream().equals(this.stream);
         }else {
             return false;
         }
@@ -83,6 +46,6 @@ public class Hook {
 
     @Override
     public String toString() {
-        return this.getHookType() + this.getApp() + this.getStream() + this.getMediaServerId();
+        return this.getHookType() + this.getApp() + this.getStream();
     }
 }

+ 3 - 1
src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java

@@ -70,11 +70,13 @@ public class HookSubscribe {
     private final Map<String, Hook> allHook = new ConcurrentHashMap<>();
 
     private void sendNotify(HookType hookType, MediaEvent event) {
-        Hook paramHook = Hook.getInstance(hookType, event.getApp(), event.getStream(), event.getMediaServer().getId());
+        Hook paramHook = Hook.getInstance(hookType, event.getApp(), event.getStream());
         Event hookSubscribeEvent = allSubscribes.get(paramHook.toString());
         if (hookSubscribeEvent != null) {
             HookData data = HookData.getInstance(event);
             hookSubscribeEvent.response(data);
+        }else {
+
         }
     }
 

+ 2 - 8
src/main/java/com/genersoft/iot/vmp/media/event/media/MediaArrivalEvent.java

@@ -4,7 +4,6 @@ import com.genersoft.iot.vmp.media.bean.MediaInfo;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
 import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
-import lombok.Data;
 import lombok.Getter;
 import lombok.Setter;
 
@@ -19,15 +18,14 @@ public class MediaArrivalEvent extends MediaEvent {
         super(source);
     }
 
-    public static MediaArrivalEvent getInstance(Object source, OnStreamChangedHookParam hookParam, MediaServer mediaServer){
+    public static MediaArrivalEvent getInstance(Object source, OnStreamChangedHookParam hookParam, MediaServer mediaServer, String serverId){
         MediaArrivalEvent mediaArrivalEvent = new MediaArrivalEvent(source);
-        mediaArrivalEvent.setMediaInfo(MediaInfo.getInstance(hookParam, mediaServer));
+        mediaArrivalEvent.setMediaInfo(MediaInfo.getInstance(hookParam, mediaServer, serverId));
         mediaArrivalEvent.setApp(hookParam.getApp());
         mediaArrivalEvent.setStream(hookParam.getStream());
         mediaArrivalEvent.setMediaServer(mediaServer);
         mediaArrivalEvent.setSchema(hookParam.getSchema());
         mediaArrivalEvent.setSchema(hookParam.getSchema());
-        mediaArrivalEvent.setHookParam(hookParam);
         mediaArrivalEvent.setParamMap(hookParam.getParamMap());
         return mediaArrivalEvent;
     }
@@ -40,10 +38,6 @@ public class MediaArrivalEvent extends MediaEvent {
     @Setter
     private String callId;
 
-    @Getter
-    @Setter
-    private OnStreamChangedHookParam hookParam;
-
     @Getter
     @Setter
     private StreamContent streamInfo;

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -156,7 +156,7 @@ public class ZLMHttpHookListener {
                 }else {
                     param.setParamMap(new HashMap<>());
                 }
-                MediaArrivalEvent mediaArrivalEvent = MediaArrivalEvent.getInstance(this, param, mediaServer);
+                MediaArrivalEvent mediaArrivalEvent = MediaArrivalEvent.getInstance(this, param, mediaServer, userSetting.getServerId());
                 applicationEventPublisher.publishEvent(mediaArrivalEvent);
             } else {
                 log.info("[ZLM HOOK] 流注销, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());

+ 6 - 2
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java

@@ -5,6 +5,7 @@ import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.common.CommonCallback;
 import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
@@ -35,6 +36,9 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
     @Autowired
     private ZLMServerFactory zlmServerFactory;
 
+    @Autowired
+    private UserSetting userSetting;
+
     @Override
     public int createRTPServer(MediaServer mediaServer, String streamId, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) {
         return zlmServerFactory.createRTPServer(mediaServer, streamId, ssrc, port, onlyAuto, reUsePort, tcpMode);
@@ -181,7 +185,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
                     return null;
                 }
                 JSONObject mediaJSON = data.getJSONObject(0);
-                MediaInfo mediaInfo = MediaInfo.getInstance(mediaJSON, mediaServer);
+                MediaInfo mediaInfo = MediaInfo.getInstance(mediaJSON, mediaServer, userSetting.getServerId());
                 StreamInfo streamInfo = getStreamInfoByAppAndStream(mediaServer, app, stream, mediaInfo, callId, true);
                 if (streamInfo != null) {
                     streamInfoList.add(streamInfo);
@@ -234,7 +238,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
         if (jsonObject.getInteger("code") != 0) {
             return null;
         }
-        return MediaInfo.getInstance(jsonObject, mediaServer);
+        return MediaInfo.getInstance(jsonObject, mediaServer, userSetting.getServerId());
     }
 
     @Override

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java

@@ -166,7 +166,7 @@ public class RedisRpcController {
     /**
      * 监听流上线
      */
-    public RedisRpcResponse onPushStreamOnlineEvent(RedisRpcRequest request) {
+    public RedisRpcResponse onStreamOnlineEvent(RedisRpcRequest request) {
         StreamInfo streamInfo = JSONObject.parseObject(request.getParam().toString(), StreamInfo.class);
         log.info("[redis-rpc] 监听流上线: {}/{}", streamInfo.getApp(), streamInfo.getStream());
         // 查询本级是否有这个流

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java

@@ -158,11 +158,11 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
 
         log.info("[请求所有WVP监听流上线] {}/{}", app, stream);
         // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
-        Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, null);
+        Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream);
         StreamInfo streamInfoParam = new StreamInfo();
         streamInfoParam.setApp(app);
         streamInfoParam.setStream(stream);
-        RedisRpcRequest request = buildRequest("onPushStreamOnlineEvent", streamInfoParam);
+        RedisRpcRequest request = buildRequest("onStreamOnlineEvent", streamInfoParam);
         hookSubscribe.addSubscribe(hook, (hookData) -> {
             if (callback != null) {
                 callback.run(mediaServerService.getStreamInfoByAppAndStream(hookData.getMediaServer(),

+ 2 - 4
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java

@@ -5,9 +5,7 @@ import com.genersoft.iot.vmp.common.SystemAllInfo;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
-import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
@@ -214,9 +212,9 @@ public interface IRedisCatchStorage {
 
     void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform);
 
-    void addPushListItem(String app, String stream, MediaArrivalEvent param);
+    void addPushListItem(String app, String stream, MediaInfo param);
 
-    MediaArrivalEvent getPushListItem(String app, String stream);
+    MediaInfo getPushListItem(String app, String stream);
 
     void removePushListItem(String app, String stream, String mediaServerId);
 

+ 6 - 8
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -10,9 +10,7 @@ import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
 import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
-import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -690,22 +688,22 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
     }
 
     @Override
-    public void addPushListItem(String app, String stream, MediaArrivalEvent event) {
+    public void addPushListItem(String app, String stream, MediaInfo mediaInfo) {
         String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream;
-        redisTemplate.opsForValue().set(key, event);
+        redisTemplate.opsForValue().set(key, mediaInfo);
     }
 
     @Override
-    public MediaArrivalEvent getPushListItem(String app, String stream) {
+    public MediaInfo getPushListItem(String app, String stream) {
         String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream;
-        return (MediaArrivalEvent)redisTemplate.opsForValue().get(key);
+        return (MediaInfo)redisTemplate.opsForValue().get(key);
     }
 
     @Override
     public void removePushListItem(String app, String stream, String mediaServerId) {
         String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream;
-        OnStreamChangedHookParam param = (OnStreamChangedHookParam)redisTemplate.opsForValue().get(key);
-        if (param != null && param.getMediaServerId().equalsIgnoreCase(mediaServerId)) {
+        MediaInfo param = (MediaInfo)redisTemplate.opsForValue().get(key);
+        if (param != null && param.getMediaServer().getId().equalsIgnoreCase(mediaServerId)) {
             redisTemplate.delete(key);
         }
     }

+ 22 - 4
src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java

@@ -13,8 +13,8 @@ import java.util.Set;
 @Repository
 public interface StreamPushMapper {
 
-    @Insert("INSERT INTO wvp_stream_push (app, stream, media_server_id, server_id, push_time,  update_time, create_time, pushing) VALUES" +
-            "(#{app}, #{stream}, #{mediaServerId} , #{serverId} , #{pushTime} ,#{updateTime}, #{createTime}, #{pushing})")
+    @Insert("INSERT INTO wvp_stream_push (app, stream, media_server_id, server_id, push_time,  update_time, create_time, pushing, start_offline_push) VALUES" +
+            "(#{app}, #{stream}, #{mediaServerId} , #{serverId} , #{pushTime} ,#{updateTime}, #{createTime}, #{pushing}, #{startOfflinePush})")
     @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
     int add(StreamPush streamPushItem);
 
@@ -28,6 +28,7 @@ public interface StreamPushMapper {
             "<if test=\"serverId != null\">, server_id=#{serverId}</if>" +
             "<if test=\"pushTime != null\">, push_time=#{pushTime}</if>" +
             "<if test=\"pushing != null\">, pushing=#{pushing}</if>" +
+            "<if test=\"startOfflinePush != null\">, start_offline_push=#{startOfflinePush}</if>" +
             "WHERE id = #{id}"+
             " </script>"})
     int update(StreamPush streamPushItem);
@@ -61,9 +62,9 @@ public interface StreamPushMapper {
 
     @Insert("<script>"  +
             "Insert INTO wvp_stream_push ( " +
-            " app, stream, media_server_id, server_id, push_time,  update_time, create_time, pushing) " +
+            " app, stream, media_server_id, server_id, push_time,  update_time, create_time, pushing, start_offline_push) " +
             " VALUES <foreach collection='streamPushItems' item='item' index='index' separator=','>" +
-            " ( #{item.app}, #{item.stream}, #{item.mediaServerId},#{item.serverId} ,#{item.pushTime}, #{item.updateTime}, #{item.createTime}, #{item.pushing} )" +
+            " ( #{item.app}, #{item.stream}, #{item.mediaServerId},#{item.serverId} ,#{item.pushTime}, #{item.updateTime}, #{item.createTime}, #{item.pushing}, #{item.startOfflinePush} )" +
             " </foreach>" +
             " </script>")
     @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
@@ -136,4 +137,21 @@ public interface StreamPushMapper {
             ")</script>")
     void batchDel(List<StreamPush> streamPushList);
 
+
+    @Update({"<script>" +
+            "<foreach collection='streamPushItemForUpdate' item='item' separator=';'>" +
+            " UPDATE" +
+            " wvp_stream_push" +
+            " SET update_time=#{item.updateTime}" +
+            ", app=#{item.app}" +
+            ", stream=#{item.stream}" +
+            ", media_server_id=#{item.mediaServerId}" +
+            ", server_id=#{item.serverId}" +
+            ", push_time=#{item.pushTime}" +
+            ", pushing=#{item.pushing}" +
+            ", start_offline_push=#{item.startOfflinePush}" +
+            " WHERE id=#{item.item.id}" +
+            "</foreach>" +
+            "</script>"})
+    int batchUpdate(List<StreamPush> streamPushItemForUpdate);
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java

@@ -90,7 +90,7 @@ public interface IStreamPushService {
 
     void deleteByAppAndStream(String app, String stream);
 
-    void updatePushStatus(Integer streamPushId, boolean pushIng);
+    void updatePushStatus(StreamPush streamPush, boolean pushIng);
 
     void batchUpdate(List<StreamPush> streamPushItemForUpdate);
 

+ 5 - 5
src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java

@@ -4,7 +4,7 @@ import com.baomidou.dynamic.datasource.annotation.DS;
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.bean.MediaInfo;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
 import com.genersoft.iot.vmp.service.bean.ErrorCallback;
@@ -53,14 +53,14 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService {
     public void start(Integer id, ErrorCallback<StreamInfo> callback, String platformDeviceId, String platformName ) {
         StreamPush streamPush = streamPushMapper.queryOne(id);
         Assert.notNull(streamPush, "推流信息未找到");
-        MediaArrivalEvent pushListItem = redisCatchStorage.getPushListItem(streamPush.getApp(), streamPush.getStream());
-        if (pushListItem != null) {
+        MediaInfo mediaInfo = redisCatchStorage.getPushListItem(streamPush.getApp(), streamPush.getStream());
+        if (mediaInfo != null) {
             String callId = null;
             StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(streamPush.getApp(), streamPush.getStream());
             if (streamAuthorityInfo != null) {
                 callId = streamAuthorityInfo.getCallId();
             }
-            callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), mediaServerService.getStreamInfoByAppAndStream(pushListItem.getMediaServer(),
+            callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), mediaServerService.getStreamInfoByAppAndStream(mediaInfo.getMediaServer(),
                     streamPush.getApp(), streamPush.getStream(), null, callId));
             return;
         }
@@ -83,7 +83,7 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService {
         long key = redisRpcService.onStreamOnlineEvent(streamPush.getApp(), streamPush.getStream(), (streamInfo) -> {
             dynamicTask.stop(timeOutTaskKey);
             if (streamInfo == null) {
-                log.warn("[级联点播] 等待推流得到结果未空: {}/{}", streamPush.getApp(), streamPush.getStream());
+                log.warn("等待推流得到结果未空: {}/{}", streamPush.getApp(), streamPush.getStream());
                 callback.run(ErrorCode.ERROR100.getCode(), "fail", null);
             }else {
                 callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);

+ 31 - 31
src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java

@@ -24,7 +24,6 @@ import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
 import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
-import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
 import lombok.extern.slf4j.Slf4j;
@@ -99,14 +98,11 @@ public class StreamPushServiceImpl implements IStreamPushService {
             streamPush.setPushTime(DateUtil.getNow());
             add(streamPush);
         }else {
-            updatePushStatus(streamPushInDb.getId(), true);
+            updatePushStatus(streamPushInDb, true);
         }
         // 冗余数据,自己系统中自用
         if (!"broadcast".equals(event.getApp()) && !"talk".equals(event.getApp())) {
-            StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(
-                    event.getMediaServer(), event.getApp(), event.getStream(), event.getMediaInfo(), event.getCallId());
-            event.getHookParam().setStreamInfo(new StreamContent(streamInfo));
-            redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event);
+            redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event.getMediaInfo());
         }
 
         // 发送流变化redis消息
@@ -148,18 +144,12 @@ public class StreamPushServiceImpl implements IStreamPushService {
                 redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
             }
         }
-        StreamPush push = getPush(event.getApp(), event.getStream());
-        if (push == null) {
+        StreamPush streamPush = getPush(event.getApp(), event.getStream());
+        if (streamPush == null) {
             return;
         }
-        push.setPushing(false);
-        if (push.getGbDeviceId() != null) {
-            if (userSetting.isUsePushingAsStatus()) {
-                push.setGbStatus("OFF");
-                updateStatus(push);
-//                streamPushMapper.updatePushStatus(event.getApp(), event.getStream(), false);
-//                eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
-            }
+        if (streamPush.getGbDeviceId() != null) {
+            updatePushStatus(streamPush, false);
         }else {
             deleteByAppAndStream(event.getApp(), event.getStream());
         }
@@ -524,27 +514,28 @@ public class StreamPushServiceImpl implements IStreamPushService {
 
     @Override
     public void updateStatus(StreamPush push) {
-        if (ObjectUtils.isEmpty(push.getGbDeviceId())) {
-            return;
-        }
-        if ("ON".equalsIgnoreCase(push.getGbStatus())) {
-            gbChannelService.online(push.buildCommonGBChannel());
-        }else {
-            gbChannelService.offline(push.buildCommonGBChannel());
-        }
+
     }
 
 
 
     @Override
-    public void updatePushStatus(Integer streamPushId, boolean pushIng) {
-        StreamPush streamPushInDb = streamPushMapper.queryOne(streamPushId);
-        streamPushInDb.setPushing(pushIng);
+    @Transactional
+    public void updatePushStatus(StreamPush streamPush, boolean pushIng) {
+        streamPush.setPushing(pushIng);
         if (userSetting.isUsePushingAsStatus()) {
-            streamPushInDb.setGbStatus(pushIng?"ON":"OFF");
+            streamPush.setGbStatus(pushIng?"ON":"OFF");
+        }
+        streamPush.setPushTime(DateUtil.getNow());
+        streamPushMapper.updatePushStatus(streamPush.getId(), pushIng);
+        if (ObjectUtils.isEmpty(streamPush.getGbDeviceId())) {
+            return;
+        }
+        if ("ON".equalsIgnoreCase(streamPush.getGbStatus())) {
+            gbChannelService.online(streamPush.buildCommonGBChannel());
+        }else {
+            gbChannelService.offline(streamPush.buildCommonGBChannel());
         }
-        streamPushInDb.setPushTime(DateUtil.getNow());
-        updateStatus(streamPushInDb);
     }
 
     private List<StreamPush> handleJSON(List<StreamInfo> streamInfoList) {
@@ -570,7 +561,16 @@ public class StreamPushServiceImpl implements IStreamPushService {
 
     @Override
     public void batchUpdate(List<StreamPush> streamPushItemForUpdate) {
-
+        int result = streamPushMapper.batchUpdate(streamPushItemForUpdate);
+        if (result > 0) {
+            List<CommonGBChannel> commonGBChannels = new ArrayList<>();
+            for (StreamPush streamPush : streamPushItemForUpdate) {
+                if (!ObjectUtils.isEmpty(streamPush.getGbDeviceId())) {
+                    commonGBChannels.add(streamPush.buildCommonGBChannel());
+                }
+            }
+            gbChannelService.batchUpdate(commonGBChannels);
+        }
     }
 
     @Override

+ 1 - 1
web_src/src/components/StreamPushEdit.vue

@@ -26,7 +26,7 @@
         <el-divider content-position="center">策略</el-divider>
         <el-form ref="streamPushForm" status-icon label-width="160px" v-loading="locading">
           <el-form-item style="text-align: left">
-            <el-checkbox v-model="streamPush.autoPushChannel">拉起离线推流</el-checkbox>
+            <el-checkbox v-model="streamPush.startOfflinePush">拉起离线推流</el-checkbox>
           </el-form-item>
 
         </el-form>

+ 2 - 2
web_src/src/components/StreamPushList.vue

@@ -52,8 +52,8 @@
         </el-table-column>
         <el-table-column label="推流状态"  min-width="100">
           <template slot-scope="scope">
-            <el-tag size="medium" v-if="scope.row.pushIng">推流中</el-tag>
-            <el-tag size="medium" type="info" v-if="!scope.row.pushIng">已停止</el-tag>
+            <el-tag size="medium" v-if="scope.row.pushing">推流中</el-tag>
+            <el-tag size="medium" type="info" v-if="!scope.row.pushing">已停止</el-tag>
           </template>
         </el-table-column>
         <el-table-column prop="gbDeviceId" label="国标编码" min-width="200" >

+ 1 - 1
数据库/2.7.2-重构/初始化-mysql-2.7.2.sql

@@ -327,7 +327,7 @@ create table wvp_stream_push
     update_time       character varying(50),
     pushing           bool default false,
     self              bool default false,
-    auto_push_channel bool default true,
+    start_offline_push bool default true,
     constraint uk_stream_push_app_stream unique (app, stream)
 );
 create table wvp_cloud_record