Browse Source

临时提交

648540858 1 year ago
parent
commit
f32eb14125

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java

@@ -7,6 +7,9 @@ import lombok.Data;
 @Schema(description = "国标通道")
 public class CommonGBChannel {
 
+    @Schema(description = "国标-数据库自增ID")
+    private int gbId;
+
     @Schema(description = "国标-编码")
     private String gbDeviceId;
 

+ 25 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java

@@ -0,0 +1,25 @@
+package com.genersoft.iot.vmp.gb28181.service;
+
+import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
+
+import java.util.List;
+
+public interface IGbChannelService {
+
+    CommonGBChannel queryByDeviceId(String gbDeviceId);
+
+    int add(CommonGBChannel commonGBChannel);
+
+    int delete(int gbId);
+
+    int update(CommonGBChannel commonGBChannel);
+
+    int offline(CommonGBChannel commonGBChannel);
+
+    int online(CommonGBChannel commonGBChannel);
+
+    void closeSend(CommonGBChannel commonGBChannel);
+
+    void batchAdd(List<CommonGBChannel> commonGBChannels);
+
+}

+ 10 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java

@@ -0,0 +1,10 @@
+package com.genersoft.iot.vmp.gb28181.service.impl;
+
+import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+public class GbChannelServiceImpl implements IGbChannelService {
+}

+ 0 - 6
src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java

@@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.media.event.mediaServer;
 
 import com.genersoft.iot.vmp.service.IPlayService;
 import com.genersoft.iot.vmp.service.IStreamProxyService;
-import com.genersoft.iot.vmp.service.IStreamPushService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -23,9 +22,6 @@ public class MediaServerStatusEventListener {
 	
 	private final static Logger logger = LoggerFactory.getLogger(MediaServerStatusEventListener.class);
 
-	@Autowired
-	private IStreamPushService streamPushService;
-
 	@Autowired
 	private IStreamProxyService streamProxyService;
 
@@ -36,7 +32,6 @@ public class MediaServerStatusEventListener {
 	@EventListener
 	public void onApplicationEvent(MediaServerOnlineEvent event) {
 		logger.info("[媒体节点] 上线 ID:" + event.getMediaServerId());
-		streamPushService.zlmServerOnline(event.getMediaServerId());
 		streamProxyService.zlmServerOnline(event.getMediaServerId());
 		playService.zlmServerOnline(event.getMediaServerId());
 	}
@@ -48,7 +43,6 @@ public class MediaServerStatusEventListener {
 		logger.info("[媒体节点] 离线,ID:" + event.getMediaServerId());
 		// 处理ZLM离线
 		streamProxyService.zlmServerOffline(event.getMediaServerId());
-		streamPushService.zlmServerOffline(event.getMediaServerId());
 		playService.zlmServerOffline(event.getMediaServerId());
 	}
 }

+ 11 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPush.java

@@ -8,6 +8,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.jetbrains.annotations.NotNull;
+import org.springframework.util.ObjectUtils;
 
 
 @Data
@@ -102,6 +103,16 @@ public class StreamPush extends CommonGBChannel implements Comparable<StreamPush
         return streamPushItem;
     }
 
+    public CommonGBChannel getCommonGBChannel() {
+        if (ObjectUtils.isEmpty(this.getGbDeviceId())) {
+            return null;
+        }
+        if (ObjectUtils.isEmpty(this.getGbName())) {
+            this.setGbName( app+ "-" +stream);
+        }
+        return this;
+    }
+
 
 }
 

+ 7 - 1
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java

@@ -40,12 +40,14 @@ public interface IStreamPushService {
 
     StreamPush getPush(String app, String streamId);
 
+    boolean stop(StreamPush streamPush);
+
     /**
      * 停止一路推流
      * @param app 应用名
      * @param stream 流ID
      */
-    boolean stop(String app, String stream);
+    boolean stopByAppAndStream(String app, String stream);
 
     /**
      * 新的节点加入
@@ -122,4 +124,8 @@ public interface IStreamPushService {
     Map<String, StreamPush> getAllGBId();
 
     void updateStatus(StreamPush push);
+
+    void deleteByAppAndStream(String app, String stream);
+
+    void updatePushStatus(Integer streamPushId, boolean pushIng);
 }

+ 210 - 170
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java

@@ -5,19 +5,22 @@ import com.baomidou.dynamic.datasource.annotation.DS;
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.MediaConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
+import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
 import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
+import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent;
+import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
 import com.genersoft.iot.vmp.service.IStreamPushService;
 import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
@@ -28,6 +31,7 @@ import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
 import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
+import lombok.extern.slf4j.Slf4j;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -36,18 +40,17 @@ import org.springframework.jdbc.datasource.DataSourceTransactionManager;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.TransactionDefinition;
-import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.ObjectUtils;
 
 import java.util.*;
 import java.util.stream.Collectors;
 
 @Service
+@Slf4j
 @DS("master")
 public class StreamPushServiceImpl implements IStreamPushService {
 
-    private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class);
-
     @Autowired
     private StreamPushMapper streamPushMapper;
 
@@ -84,6 +87,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
     @Autowired
     private MediaConfig mediaConfig;
 
+    @Autowired
+    private IGbChannelService gbChannelService;
+
     /**
      * 流到来的处理
      */
@@ -107,17 +113,17 @@ public class StreamPushServiceImpl implements IStreamPushService {
             streamAuthorityInfo.setOriginType(mediaInfo.getOriginType());
         }
         redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo);
-        StreamPush transform = StreamPush.getInstance(event, userSetting.getServerId());
-        transform.setPushIng(true);
-        transform.setUpdateTime(DateUtil.getNow());
-        transform.setPushTime(DateUtil.getNow());
-        transform.setSelf(true);
-        StreamPush pushInDb = getPush(event.getApp(), event.getStream());
-        if (pushInDb == null) {
-            transform.setCreateTime(DateUtil.getNow());
-            add(transform);
+
+        StreamPush streamPushInDb = getPush(event.getApp(), event.getStream());
+        if (streamPushInDb == null) {
+            StreamPush streamPush = StreamPush.getInstance(event, userSetting.getServerId());
+            streamPush.setPushIng(true);
+            streamPush.setUpdateTime(DateUtil.getNow());
+            streamPush.setPushTime(DateUtil.getNow());
+            streamPush.setSelf(true);
+            add(streamPush);
         }else {
-            update(transform);
+            updatePushStatus(streamPushInDb.getId(), true);
         }
         // 冗余数据,自己系统中自用
         if (!"broadcast".equals(event.getApp()) && !"talk".equals(event.getApp())) {
@@ -143,6 +149,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
     @Async("taskExecutor")
     @EventListener
     public void onApplicationEvent(MediaDepartureEvent event) {
+
         // 兼容流注销时类型从redis记录获取
         MediaInfo mediaInfo = redisCatchStorage.getStreamInfo(
                 event.getApp(), event.getStream(), event.getMediaServer().getId());
@@ -178,37 +185,22 @@ public class StreamPushServiceImpl implements IStreamPushService {
         }
     }
 
-
-    private List<StreamPush> handleJSON(List<StreamInfo> streamInfoList) {
-        if (streamInfoList == null || streamInfoList.isEmpty()) {
-            return null;
-        }
-        Map<String, StreamPush> result = new HashMap<>();
-        for (StreamInfo streamInfo : streamInfoList) {
-            // 不保存国标推理以及拉流代理的流
-            if (streamInfo.getOriginType() == OriginType.RTSP_PUSH.ordinal()
-                    || streamInfo.getOriginType() == OriginType.RTMP_PUSH.ordinal()
-                    || streamInfo.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
-                String key = streamInfo.getApp() + "_" + streamInfo.getStream();
-                StreamPush streamPushItem = result.get(key);
-                if (streamPushItem == null) {
-                    streamPushItem = streamPushItem.getInstance(streamInfo);
-                    result.put(key, streamPushItem);
-                }
-            }
-        }
-        return new ArrayList<>(result.values());
+    /**
+     * 流媒体节点上线
+     */
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaServerOnlineEvent event) {
+        zlmServerOnline(event.getMediaServerId());
     }
 
-    @Override
-    public StreamPush transform(OnStreamChangedHookParam item) {
-        StreamPush streamPushItem = new StreamPush();
-        streamPushItem.setApp(item.getApp());
-        streamPushItem.setMediaServerId(item.getMediaServerId());
-        streamPushItem.setStream(item.getStream());
-        streamPushItem.setCreateTime(DateUtil.getNow());
-        streamPushItem.setServerId(item.getSeverId());
-        return streamPushItem;
+    /**
+     * 流媒体节点离线
+     */
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaServerOfflineEvent event) {
+        zlmServerOffline(event.getMediaServerId());
     }
 
     @Override
@@ -225,29 +217,113 @@ public class StreamPushServiceImpl implements IStreamPushService {
 
 
     @Override
-    public StreamPush getPush(String app, String streamId) {
-        return streamPushMapper.selectOne(app, streamId);
+    public StreamPush getPush(String app, String stream) {
+        return streamPushMapper.selectByAppAndStream(app, stream);
     }
 
     @Override
-    public boolean stop(String app, String stream) {
-        logger.info("[推流] 停止推流: {}/{}", app, stream);
-        StreamPush streamPushItem = streamPushMapper.selectOne(app, stream);
-        if (streamPushItem != null) {
-            gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
+    @Transactional
+    public boolean add(StreamPush stream) {
+        log.info("[添加推流] app: {}, stream: {}, 国标编号: {}", stream.getApp(), stream.getStream(), stream.getGbDeviceId());
+        stream.setUpdateTime(DateUtil.getNow());
+        stream.setCreateTime(DateUtil.getNow());
+        int addResult = streamPushMapper.add(stream);
+        if (addResult <= 0) {
+            return false;
+        }
+        if (ObjectUtils.isEmpty(stream.getGbDeviceId())) {
+            return true;
         }
+        CommonGBChannel channel = gbChannelService.queryByDeviceId(stream.getGbDeviceId());
+        if (channel != null) {
+            log.info("[添加推流]失败,国标编号已存在: {} app: {}, stream: {}, ", stream.getGbDeviceId(), stream.getApp(), stream.getStream());
+        }
+        int addChannelResult = gbChannelService.add(stream.getCommonGBChannel());
+        return addChannelResult > 0;
+    }
 
-        platformGbStreamMapper.delByAppAndStream(app, stream);
-        gbStreamMapper.del(app, stream);
-        int delStream = streamPushMapper.del(app, stream);
-        if (delStream > 0) {
-            MediaServer mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
-            mediaServerService.closeStreams(mediaServerItem,app, stream);
+    @Override
+    @Transactional
+    public void deleteByAppAndStream(String app, String stream) {
+        log.info("[删除推流] app: {}, stream: {}, ", app, stream);
+        StreamPush streamPush = streamPushMapper.selectByAppAndStream(app, stream);
+        if (streamPush == null) {
+            log.info("[删除推流]失败, 不存在 app: {}, stream: {}, ", app, stream);
+            return;
+        }
+        if (streamPush.isPushIng()) {
+            stop(streamPush);
+        }
+        if (streamPush.getGbId() > 0) {
+            gbChannelService.delete(streamPush.getGbId());
+        }
+    }
+    @Override
+    @Transactional
+    public boolean update(StreamPush streamPush) {
+        log.info("[更新推流]:id: {}, app: {}, stream: {}, ", streamPush.getId(), streamPush.getApp(), streamPush.getStream());
+        assert streamPush.getId() != null;
+        streamPush.setUpdateTime(DateUtil.getNow());
+        streamPushMapper.update(streamPush);
+        if (streamPush.getGbId() > 0) {
+            gbChannelService.update(streamPush.getCommonGBChannel());
+        }
+        return true;
+    }
+
+
+    @Override
+    @Transactional
+    public boolean stop(StreamPush streamPush) {
+        log.info("[主动停止推流] id: {}, app: {}, stream: {}, ", streamPush.getId(), streamPush.getApp(), streamPush.getStream());
+        MediaServer mediaServer = null;
+        if (streamPush.getMediaServerId() == null) {
+            log.info("[主动停止推流]未找到使用MediaServer,开始自动检索 id: {}, app: {}, stream: {}, ", streamPush.getId(), streamPush.getApp(), streamPush.getStream());
+            mediaServer = mediaServerService.getMediaServerByAppAndStream(streamPush.getApp(), streamPush.getStream());
+            if (mediaServer != null) {
+                log.info("[主动停止推流] 检索到MediaServer为{}, id: {}, app: {}, stream: {}, ", mediaServer.getId(), streamPush.getId(), streamPush.getApp(), streamPush.getStream());
+            }else {
+                log.info("[主动停止推流]未找到使用MediaServer id: {}, app: {}, stream: {}, ", streamPush.getId(), streamPush.getApp(), streamPush.getStream());
+            }
+        }else {
+            mediaServer = mediaServerService.getOne(streamPush.getMediaServerId());
+            if (mediaServer == null) {
+                log.info("[主动停止推流]未找到使用的MediaServer: {},开始自动检索 id: {}, app: {}, stream: {}, ",streamPush.getMediaServerId(),  streamPush.getId(), streamPush.getApp(), streamPush.getStream());
+                mediaServer = mediaServerService.getMediaServerByAppAndStream(streamPush.getApp(), streamPush.getStream());
+                if (mediaServer != null) {
+                    log.info("[主动停止推流] 检索到MediaServer为{}, id: {}, app: {}, stream: {}, ", mediaServer.getId(), streamPush.getId(), streamPush.getApp(), streamPush.getStream());
+                }else {
+                    log.info("[主动停止推流]未找到使用MediaServer id: {}, app: {}, stream: {}, ", streamPush.getId(), streamPush.getApp(), streamPush.getStream());
+                }
+            }
+        }
+        if (mediaServer != null) {
+            mediaServerService.closeStreams(mediaServer, streamPush.getApp(), streamPush.getStream());
+        }
+        streamPush.setPushIng(false);
+        if (userSetting.isUsePushingAsStatus()) {
+            streamPush.setGbStatus(false);
+            gbChannelService.offline(streamPush.getCommonGBChannel());
+        }
+        gbChannelService.closeSend(streamPush.getCommonGBChannel());
+        streamPush.setUpdateTime(DateUtil.getNow());
+        streamPushMapper.update(streamPush);
+        return true;
+    }
+
+    @Override
+    @Transactional
+    public boolean stopByAppAndStream(String app, String stream) {
+        log.info("[主动停止推流] : app: {}, stream: {}, ", app, stream);
+        StreamPush streamPushItem = streamPushMapper.selectByAppAndStream(app, stream);
+        if (streamPushItem != null) {
+            stop(streamPushItem);
         }
         return true;
     }
 
     @Override
+    @Transactional
     public void zlmServerOnline(String mediaServerId) {
         // 同步zlm推流信息
         MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
@@ -260,14 +336,14 @@ public class StreamPushServiceImpl implements IStreamPushService {
         // redis记录
         List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PUSH");
         Map<String, MediaInfo> streamInfoPushItemMap = new HashMap<>();
-        if (pushList.size() > 0) {
+        if (!pushList.isEmpty()) {
             for (StreamPush streamPushItem : pushList) {
                 if (ObjectUtils.isEmpty(streamPushItem.getGbId())) {
                     pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
                 }
             }
         }
-        if (mediaInfoList.size() > 0) {
+        if (!mediaInfoList.isEmpty()) {
             for (MediaInfo mediaInfo : mediaInfoList) {
                 streamInfoPushItemMap.put(mediaInfo.getApp() + mediaInfo.getStream(), mediaInfo);
             }
@@ -290,26 +366,33 @@ public class StreamPushServiceImpl implements IStreamPushService {
                 streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
             }
         }
-        List<StreamPush> offlinePushItems = new ArrayList<>(pushItemMap.values());
-        if (offlinePushItems.size() > 0) {
-            String type = "PUSH";
-            int runLimit = 300;
-            if (offlinePushItems.size() > runLimit) {
-                for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
-                    int toIndex = i + runLimit;
-                    if (i + runLimit > offlinePushItems.size()) {
-                        toIndex = offlinePushItems.size();
-                    }
-                    List<StreamPush> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
-                    streamPushMapper.delAll(streamPushItemsSub);
-                }
-            }else {
-                streamPushMapper.delAll(offlinePushItems);
+        List<StreamPush> changedStreamPushList = new ArrayList<>(pushItemMap.values());
+        if (!changedStreamPushList.isEmpty()) {
+            for (StreamPush streamPush : changedStreamPushList) {
+                stop(streamPush);
             }
-
         }
+
+
+//        if (!changedStreamPushList.isEmpty()) {
+//            String type = "PUSH";
+//            int runLimit = 300;
+//            if (changedStreamPushList.size() > runLimit) {
+//                for (int i = 0; i < changedStreamPushList.size(); i += runLimit) {
+//                    int toIndex = i + runLimit;
+//                    if (i + runLimit > changedStreamPushList.size()) {
+//                        toIndex = changedStreamPushList.size();
+//                    }
+//                    List<StreamPush> streamPushItemsSub = changedStreamPushList.subList(i, toIndex);
+//                    streamPushMapper.delAll(streamPushItemsSub);
+//                }
+//            }else {
+//                streamPushMapper.delAll(changedStreamPushList);
+//            }
+//
+//        }
         Collection<MediaInfo> mediaInfos = streamInfoPushItemMap.values();
-        if (mediaInfos.size() > 0) {
+        if (!mediaInfos.isEmpty()) {
             String type = "PUSH";
             for (MediaInfo mediaInfo : mediaInfos) {
                 JSONObject jsonObject = new JSONObject();
@@ -327,7 +410,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
         }
 
         Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values();
-        if (streamAuthorityInfos.size() > 0) {
+        if (!streamAuthorityInfos.isEmpty()) {
             for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) {
                 // 移除redis内流的信息
                 redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream());
@@ -336,19 +419,24 @@ public class StreamPushServiceImpl implements IStreamPushService {
     }
 
     @Override
+    @Transactional
     public void zlmServerOffline(String mediaServerId) {
-        List<StreamPush> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
-        // 移除没有GBId的推流
-        streamPushMapper.deleteWithoutGBId(mediaServerId);
-        gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
-        // 其他的流设置未启用
-        streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
-        streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
+        List<StreamPush> streamPushItems = streamPushMapper.selectAllByMediaServerId(mediaServerId);
+        if (!streamPushItems.isEmpty()) {
+            for (StreamPush streamPushItem : streamPushItems) {
+                stop(streamPushItem);
+            }
+        }
+//        // 移除没有GBId的推流
+//        streamPushMapper.deleteWithoutGBId(mediaServerId);
+//        // 其他的流设置未启用
+//        streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
+//        streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
         // 发送流停止消息
         String type = "PUSH";
         // 发送redis消息
         List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type);
-        if (mediaInfoList.size() > 0) {
+        if (!mediaInfoList.isEmpty()) {
             for (MediaInfo mediaInfo : mediaInfoList) {
                 // 移除redis内流的信息
                 redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream());
@@ -367,41 +455,16 @@ public class StreamPushServiceImpl implements IStreamPushService {
     }
 
     @Override
-    public void clean() {
-
-    }
-
-    @Override
-    public boolean saveToRandomGB() {
-        List<StreamPush> streamPushItems = streamPushMapper.selectAll();
-        long gbId = 100001;
-        for (StreamPush streamPushItem : streamPushItems) {
-            streamPushItem.setStreamType("push");
-            streamPushItem.setStatus(true);
-            streamPushItem.setGbId("34020000004111" + gbId);
-            streamPushItem.setCreateTime(DateUtil.getNow());
-            gbId ++;
-        }
-        int  limitCount = 30;
-
-        if (streamPushItems.size() > limitCount) {
-            for (int i = 0; i < streamPushItems.size(); i += limitCount) {
-                int toIndex = i + limitCount;
-                if (i + limitCount > streamPushItems.size()) {
-                    toIndex = streamPushItems.size();
-                }
-                gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex));
-            }
-        }else {
-            gbStreamMapper.batchAdd(streamPushItems);
-        }
-        return true;
-    }
-
-    @Override
+    @Transactional
     public void batchAdd(List<StreamPush> streamPushItems) {
         streamPushMapper.addAll(streamPushItems);
-        gbStreamMapper.batchAdd(streamPushItems);
+        List<CommonGBChannel> commonGBChannels = new ArrayList<>();
+        for (StreamPush streamPush : streamPushItems) {
+            if (ObjectUtils.isEmpty(streamPush.getGbDeviceId())) {
+                commonGBChannels.add(streamPush.getCommonGBChannel());
+            }
+        }
+        gbChannelService.batchAdd(commonGBChannels);
     }
 
 
@@ -459,7 +522,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
                             // 不存在这个平台,则忽略导入此关联关系
                             if (platformInfoMap.get(platFormInfoArray[0]) == null
                                     || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
-                                logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
+                                log.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
                                 continue;
                             }
                             streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
@@ -544,38 +607,6 @@ public class StreamPushServiceImpl implements IStreamPushService {
         eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON);
     }
 
-    @Override
-    public boolean add(StreamPush stream) {
-        stream.setUpdateTime(DateUtil.getNow());
-        stream.setCreateTime(DateUtil.getNow());
-        stream.setServerId(userSetting.getServerId());
-        stream.setMediaServerId(mediaConfig.getId());
-        stream.setSelf(true);
-        stream.setPushIng(true);
-
-        // 放在事务内执行
-        boolean result = false;
-        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
-        try {
-            int addStreamResult = streamPushMapper.add(stream);
-            if (!ObjectUtils.isEmpty(stream.getGbId())) {
-                stream.setStreamType("push");
-                gbStreamMapper.add(stream);
-            }
-            dataSourceTransactionManager.commit(transactionStatus);
-            result = true;
-        }catch (Exception e) {
-            logger.error("批量移除流与平台的关系时错误", e);
-            dataSourceTransactionManager.rollback(transactionStatus);
-        }
-        return result;
-    }
-
-    @Override
-    public boolean update(StreamPush stream) {
-
-    }
-
     @Override
     public List<String> getAllAppAndStream() {
 
@@ -600,32 +631,41 @@ public class StreamPushServiceImpl implements IStreamPushService {
         return streamPushMapper.getAllGBId();
     }
 
-    @Override
-    public void updatePush(OnStreamChangedHookParam param) {
-        StreamPush transform = transform(param);
-        StreamPush pushInDb = getPush(param.getApp(), param.getStream());
-        transform.setPushIng(param.isRegist());
-        transform.setUpdateTime(DateUtil.getNow());
-        transform.setPushTime(DateUtil.getNow());
-        transform.setSelf(userSetting.getServerId().equals(param.getSeverId()));
-        if (pushInDb == null) {
-            transform.setCreateTime(DateUtil.getNow());
-            streamPushMapper.add(transform);
-        }else {
-            streamPushMapper.update(transform);
-            gbStreamMapper.updateMediaServer(param.getApp(), param.getStream(), param.getMediaServerId());
-        }
-    }
-
     @Override
     public void updateStatus(StreamPush push) {
 
     }
 
-    @Override
-    public void deleteByAppAndStream(String app, String stream) {
 
+
+    @Override
+    public void updatePushStatus(Integer streamPushId, boolean pushIng) {
+        streamPushInDb.setPushIng(true);
+        if (userSetting.isUsePushingAsStatus()) {
+            streamPushInDb.setGbStatus(true);
+        }
+        streamPushInDb.setPushTime(DateUtil.getNow());
     }
 
+    private List<StreamPush> handleJSON(List<StreamInfo> streamInfoList) {
+        if (streamInfoList == null || streamInfoList.isEmpty()) {
+            return null;
+        }
+        Map<String, StreamPush> result = new HashMap<>();
+        for (StreamInfo streamInfo : streamInfoList) {
+            // 不保存国标推理以及拉流代理的流
+            if (streamInfo.getOriginType() == OriginType.RTSP_PUSH.ordinal()
+                    || streamInfo.getOriginType() == OriginType.RTMP_PUSH.ordinal()
+                    || streamInfo.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
+                String key = streamInfo.getApp() + "_" + streamInfo.getStream();
+                StreamPush streamPushItem = result.get(key);
+                if (streamPushItem == null) {
+                    streamPushItem = streamPushItem.getInstance(streamInfo);
+                    result.put(key, streamPushItem);
+                }
+            }
+        }
+        return new ArrayList<>(result.values());
+    }
 
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java

@@ -44,7 +44,7 @@ public class RedisCloseStreamMsgListener implements MessageListener {
                         JSONObject jsonObject = JSON.parseObject(msg.getBody());
                         String app = jsonObject.getString("app");
                         String stream = jsonObject.getString("stream");
-                        pushService.stop(app, stream);
+                        pushService.stopByAppAndStream(app, stream);
                     }catch (Exception e) {
                         log.warn("[REDIS的关闭推流通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
                         log.error("[REDIS的关闭推流通知] 异常内容: ", e);

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java

@@ -87,7 +87,7 @@ public interface StreamPushMapper {
     List<StreamPush> selectAll();
 
     @Select("SELECT st.*, gs.gb_id, gs.name, gs.longitude, gs.latitude FROM wvp_stream_push st LEFT join wvp_gb_stream gs on st.app = gs.app AND st.stream = gs.stream WHERE st.app=#{app} AND st.stream=#{stream}")
-    StreamPush selectOne(@Param("app") String app, @Param("stream") String stream);
+    StreamPush selectByAppAndStream(@Param("app") String app, @Param("stream") String stream);
 
     @Insert("<script>"  +
             "Insert INTO wvp_stream_push (app, stream, total_reader_count, origin_type, origin_type_str, " +

+ 1 - 2
src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java

@@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.conf.security.JwtUtils;
 import com.genersoft.iot.vmp.conf.security.SecurityUtils;
 import com.genersoft.iot.vmp.conf.security.dto.LoginUser;
-import com.genersoft.iot.vmp.gb28181.bean.GbStream;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
@@ -95,7 +94,7 @@ public class StreamPushController {
     @Parameter(name = "app", description = "应用名", required = true)
     @Parameter(name = "stream", description = "流id", required = true)
     public void stop(String app, String stream){
-        if (!streamPushService.stop(app, stream)){
+        if (!streamPushService.stopByAppAndStream(app, stream)){
             throw new ControllerException(ErrorCode.ERROR100);
         }
     }