| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636 |
- package com.genersoft.iot.vmp.service.impl;
- import com.alibaba.fastjson2.JSONObject;
- import com.baomidou.dynamic.datasource.annotation.DS;
- import com.genersoft.iot.vmp.common.StreamInfo;
- import com.genersoft.iot.vmp.conf.MediaConfig;
- import com.genersoft.iot.vmp.conf.UserSetting;
- import com.genersoft.iot.vmp.gb28181.bean.GbStream;
- import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
- import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
- import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
- import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
- import com.genersoft.iot.vmp.media.bean.MediaInfo;
- import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
- import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
- import com.genersoft.iot.vmp.media.service.IMediaServerService;
- import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
- import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
- import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
- import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
- import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
- import com.genersoft.iot.vmp.service.IGbStreamService;
- import com.genersoft.iot.vmp.service.IStreamPushService;
- import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
- import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
- import com.genersoft.iot.vmp.storager.dao.*;
- import com.genersoft.iot.vmp.utils.DateUtil;
- import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
- import com.github.pagehelper.PageHelper;
- import com.github.pagehelper.PageInfo;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.event.EventListener;
- import org.springframework.jdbc.datasource.DataSourceTransactionManager;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.TransactionDefinition;
- import org.springframework.transaction.TransactionStatus;
- import org.springframework.util.ObjectUtils;
- import java.util.*;
- import java.util.stream.Collectors;
- @Service
- @DS("master")
- public class StreamPushServiceImpl implements IStreamPushService {
- private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class);
- @Autowired
- private GbStreamMapper gbStreamMapper;
- @Autowired
- private StreamPushMapper streamPushMapper;
- @Autowired
- private StreamProxyMapper streamProxyMapper;
- @Autowired
- private ParentPlatformMapper parentPlatformMapper;
- @Autowired
- private PlatformCatalogMapper platformCatalogMapper;
- @Autowired
- private PlatformGbStreamMapper platformGbStreamMapper;
- @Autowired
- private IGbStreamService gbStreamService;
- @Autowired
- private EventPublisher eventPublisher;
- @Autowired
- private IRedisCatchStorage redisCatchStorage;
- @Autowired
- private UserSetting userSetting;
- @Autowired
- private IMediaServerService mediaServerService;
- @Autowired
- DataSourceTransactionManager dataSourceTransactionManager;
- @Autowired
- TransactionDefinition transactionDefinition;
- @Autowired
- private MediaConfig mediaConfig;
- /**
- * 流到来的处理
- */
- @Async("taskExecutor")
- @EventListener
- public void onApplicationEvent(MediaArrivalEvent event) {
- MediaInfo mediaInfo = event.getMediaInfo();
- if (mediaInfo == null) {
- return;
- }
- if (mediaInfo.getOriginType() != OriginType.RTMP_PUSH.ordinal()
- && mediaInfo.getOriginType() != OriginType.RTSP_PUSH.ordinal()
- && mediaInfo.getOriginType() != OriginType.RTC_PUSH.ordinal()) {
- return;
- }
- StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(event.getApp(), event.getStream());
- if (streamAuthorityInfo == null) {
- streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(event);
- } else {
- streamAuthorityInfo.setOriginType(mediaInfo.getOriginType());
- }
- redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo);
- StreamPushItem transform = StreamPushItem.getInstance(event, userSetting.getServerId());
- transform.setPushIng(true);
- transform.setUpdateTime(DateUtil.getNow());
- transform.setPushTime(DateUtil.getNow());
- transform.setSelf(true);
- StreamPushItem pushInDb = getPush(event.getApp(), event.getStream());
- if (pushInDb == null) {
- transform.setCreateTime(DateUtil.getNow());
- streamPushMapper.add(transform);
- }else {
- streamPushMapper.update(transform);
- gbStreamMapper.updateMediaServer(event.getApp(), event.getStream(), event.getMediaServer().getId());
- }
- // TODO 相关的事件自行管理,不需要写入ZLMMediaListManager
- // ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream());
- // if ( channelOnlineEventLister != null) {
- // try {
- // channelOnlineEventLister.run(transform.getApp(), transform.getStream(), transform.getServerId());;
- // } catch (ParseException e) {
- // logger.error("addPush: ", e);
- // }
- // removedChannelOnlineEventLister(transform.getApp(), transform.getStream());
- // }
- // 冗余数据,自己系统中自用
- redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event);
- // 发送流变化redis消息
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("serverId", userSetting.getServerId());
- jsonObject.put("app", event.getApp());
- jsonObject.put("stream", event.getStream());
- jsonObject.put("register", true);
- jsonObject.put("mediaServerId", event.getMediaServer().getId());
- redisCatchStorage.sendStreamChangeMsg(OriginType.values()[event.getMediaInfo().getOriginType()].getType(), jsonObject);
- }
- /**
- * 流离开的处理
- */
- @Async("taskExecutor")
- @EventListener
- public void onApplicationEvent(MediaDepartureEvent event) {
- // 兼容流注销时类型从redis记录获取
- OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo(
- event.getApp(), event.getStream(), event.getMediaServer().getId());
- if (onStreamChangedHookParam != null) {
- String type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType();
- redisCatchStorage.removeStream(event.getMediaServer().getId(), type, event.getApp(), event.getStream());
- if ("PUSH".equalsIgnoreCase(type)) {
- // 冗余数据,自己系统中自用
- redisCatchStorage.removePushListItem(event.getApp(), event.getStream(), event.getMediaServer().getId());
- }
- if (type != null) {
- // 发送流变化redis消息
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("serverId", userSetting.getServerId());
- jsonObject.put("app", event.getApp());
- jsonObject.put("stream", event.getStream());
- jsonObject.put("register", false);
- jsonObject.put("mediaServerId", event.getMediaServer().getId());
- redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
- }
- }
- GbStream gbStream = gbStreamMapper.selectOne(event.getApp(), event.getStream());
- if (gbStream != null) {
- if (userSetting.isUsePushingAsStatus()) {
- streamPushMapper.updatePushStatus(event.getApp(), event.getStream(), false);
- eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
- }
- }else {
- streamPushMapper.del(event.getApp(), event.getStream());
- }
- }
- private List<StreamPushItem> handleJSON(List<StreamInfo> streamInfoList) {
- if (streamInfoList == null || streamInfoList.isEmpty()) {
- return null;
- }
- Map<String, StreamPushItem> result = new HashMap<>();
- for (StreamInfo streamInfo : streamInfoList) {
- // 不保存国标推理以及拉流代理的流
- if (streamInfo.getOriginType() == OriginType.RTSP_PUSH.ordinal()
- || streamInfo.getOriginType() == OriginType.RTMP_PUSH.ordinal()
- || streamInfo.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
- String key = streamInfo.getApp() + "_" + streamInfo.getStream();
- StreamPushItem streamPushItem = result.get(key);
- if (streamPushItem == null) {
- streamPushItem = streamPushItem.getInstance(streamInfo);
- result.put(key, streamPushItem);
- }
- }
- }
- return new ArrayList<>(result.values());
- }
- @Override
- public StreamPushItem transform(OnStreamChangedHookParam item) {
- StreamPushItem streamPushItem = new StreamPushItem();
- streamPushItem.setApp(item.getApp());
- streamPushItem.setMediaServerId(item.getMediaServerId());
- streamPushItem.setStream(item.getStream());
- streamPushItem.setAliveSecond(item.getAliveSecond());
- streamPushItem.setOriginSock(item.getOriginSock());
- streamPushItem.setTotalReaderCount(item.getTotalReaderCount() + "");
- streamPushItem.setOriginType(item.getOriginType());
- streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
- streamPushItem.setOriginUrl(item.getOriginUrl());
- streamPushItem.setCreateTime(DateUtil.getNow());
- streamPushItem.setAliveSecond(item.getAliveSecond());
- streamPushItem.setStatus(true);
- streamPushItem.setStreamType("push");
- streamPushItem.setVhost(item.getVhost());
- streamPushItem.setServerId(item.getSeverId());
- return streamPushItem;
- }
- @Override
- public PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) {
- PageHelper.startPage(page, count);
- List<StreamPushItem> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId);
- return new PageInfo<>(all);
- }
- @Override
- public List<StreamPushItem> getPushList(String mediaServerId) {
- return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
- }
- @Override
- public boolean saveToGB(GbStream stream) {
- stream.setStreamType("push");
- stream.setStatus(true);
- stream.setCreateTime(DateUtil.getNow());
- stream.setStreamType("push");
- stream.setMediaServerId(mediaConfig.getId());
- int add = gbStreamMapper.add(stream);
- return add > 0;
- }
- @Override
- public boolean removeFromGB(GbStream stream) {
- // 判断是否需要发送事件
- gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL);
- platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
- int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
- MediaServer mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
- List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaInfo, stream.getApp(), stream.getStream(), null);
- if (mediaList != null && mediaList.isEmpty()) {
- streamPushMapper.del(stream.getApp(), stream.getStream());
- }
- return del > 0;
- }
- @Override
- public StreamPushItem getPush(String app, String streamId) {
- return streamPushMapper.selectOne(app, streamId);
- }
- @Override
- public boolean stop(String app, String streamId) {
- logger.info("[推流 ] 停止流: {}/{}", app, streamId);
- StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
- if (streamPushItem != null) {
- gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
- }
- platformGbStreamMapper.delByAppAndStream(app, streamId);
- gbStreamMapper.del(app, streamId);
- int delStream = streamPushMapper.del(app, streamId);
- if (delStream > 0) {
- MediaServer mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
- mediaServerService.closeStreams(mediaServerItem,app, streamId);
- }
- return true;
- }
- @Override
- public void zlmServerOnline(String mediaServerId) {
- // 同步zlm推流信息
- MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
- if (mediaServerItem == null) {
- return;
- }
- // 数据库记录
- List<StreamPushItem> pushList = getPushList(mediaServerId);
- Map<String, StreamPushItem> pushItemMap = new HashMap<>();
- // redis记录
- List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH");
- Map<String, OnStreamChangedHookParam> streamInfoPushItemMap = new HashMap<>();
- if (pushList.size() > 0) {
- for (StreamPushItem streamPushItem : pushList) {
- if (ObjectUtils.isEmpty(streamPushItem.getGbId())) {
- pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
- }
- }
- }
- if (onStreamChangedHookParams.size() > 0) {
- for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) {
- streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam);
- }
- }
- // 获取所有推流鉴权信息,清理过期的
- List<StreamAuthorityInfo> allStreamAuthorityInfo = redisCatchStorage.getAllStreamAuthorityInfo();
- Map<String, StreamAuthorityInfo> streamAuthorityInfoInfoMap = new HashMap<>();
- for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) {
- streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo);
- }
- List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaServerItem, null, null, null);
- if (mediaList == null) {
- return;
- }
- List<StreamPushItem> streamPushItems = handleJSON(mediaList);
- if (streamPushItems != null) {
- for (StreamPushItem streamPushItem : streamPushItems) {
- pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
- streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
- streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
- }
- }
- List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
- if (offlinePushItems.size() > 0) {
- String type = "PUSH";
- int runLimit = 300;
- if (offlinePushItems.size() > runLimit) {
- for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
- int toIndex = i + runLimit;
- if (i + runLimit > offlinePushItems.size()) {
- toIndex = offlinePushItems.size();
- }
- List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
- streamPushMapper.delAll(streamPushItemsSub);
- }
- }else {
- streamPushMapper.delAll(offlinePushItems);
- }
- }
- Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values();
- if (offlineOnStreamChangedHookParamList.size() > 0) {
- String type = "PUSH";
- for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("serverId", userSetting.getServerId());
- jsonObject.put("app", offlineOnStreamChangedHookParam.getApp());
- jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream());
- jsonObject.put("register", false);
- jsonObject.put("mediaServerId", mediaServerId);
- redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
- // 移除redis内流的信息
- redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream());
- // 冗余数据,自己系统中自用
- redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId());
- }
- }
- Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values();
- if (streamAuthorityInfos.size() > 0) {
- for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) {
- // 移除redis内流的信息
- redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream());
- }
- }
- }
- @Override
- public void zlmServerOffline(String mediaServerId) {
- List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
- // 移除没有GBId的推流
- streamPushMapper.deleteWithoutGBId(mediaServerId);
- gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
- // 其他的流设置未启用
- streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
- streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
- // 发送流停止消息
- String type = "PUSH";
- // 发送redis消息
- List<OnStreamChangedHookParam> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
- if (streamInfoList.size() > 0) {
- for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) {
- // 移除redis内流的信息
- redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("serverId", userSetting.getServerId());
- jsonObject.put("app", onStreamChangedHookParam.getApp());
- jsonObject.put("stream", onStreamChangedHookParam.getStream());
- jsonObject.put("register", false);
- jsonObject.put("mediaServerId", mediaServerId);
- redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
- // 冗余数据,自己系统中自用
- redisCatchStorage.removePushListItem(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), mediaServerId);
- }
- }
- }
- @Override
- public void clean() {
- }
- @Override
- public boolean saveToRandomGB() {
- List<StreamPushItem> streamPushItems = streamPushMapper.selectAll();
- long gbId = 100001;
- for (StreamPushItem streamPushItem : streamPushItems) {
- streamPushItem.setStreamType("push");
- streamPushItem.setStatus(true);
- streamPushItem.setGbId("34020000004111" + gbId);
- streamPushItem.setCreateTime(DateUtil.getNow());
- gbId ++;
- }
- int limitCount = 30;
- if (streamPushItems.size() > limitCount) {
- for (int i = 0; i < streamPushItems.size(); i += limitCount) {
- int toIndex = i + limitCount;
- if (i + limitCount > streamPushItems.size()) {
- toIndex = streamPushItems.size();
- }
- gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex));
- }
- }else {
- gbStreamMapper.batchAdd(streamPushItems);
- }
- return true;
- }
- @Override
- public void batchAdd(List<StreamPushItem> streamPushItems) {
- streamPushMapper.addAll(streamPushItems);
- gbStreamMapper.batchAdd(streamPushItems);
- }
- @Override
- public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
- // 存储数据到stream_push表
- streamPushMapper.addAll(streamPushItems);
- List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream()
- .filter(streamPushItem-> streamPushItem.getGbId() != null)
- .collect(Collectors.toList());
- // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里
- if (streamPushItemForGbStream.size() > 0) {
- gbStreamMapper.batchAdd(streamPushItemForGbStream);
- }
- // 去除没有ID也就是没有存储到数据库的数据
- List<StreamPushItem> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
- .filter(streamPushItem-> streamPushItem.getGbStreamId() != null)
- .collect(Collectors.toList());
- if (streamPushItemsForPlatform.size() > 0) {
- // 获取所有平台,平台和目录信息一般不会特别大量。
- List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList();
- Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>();
- if (parentPlatformList.size() == 0) {
- return;
- }
- for (ParentPlatform platform : parentPlatformList) {
- Map<String, PlatformCatalog> catalogMap = new HashMap<>();
- // 创建根节点
- PlatformCatalog platformCatalog = new PlatformCatalog();
- platformCatalog.setId(platform.getServerGBId());
- catalogMap.put(platform.getServerGBId(), platformCatalog);
- // 查询所有节点信息
- List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId());
- if (platformCatalogs.size() > 0) {
- for (PlatformCatalog catalog : platformCatalogs) {
- catalogMap.put(catalog.getId(), catalog);
- }
- }
- platformInfoMap.put(platform.getServerGBId(), catalogMap);
- }
- List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>();
- Map<String, List<GbStream>> platformForEvent = new HashMap<>();
- // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入
- for (StreamPushItem streamPushItem : streamPushItemsForPlatform) {
- List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream());
- if (platFormInfoList != null && platFormInfoList.size() > 0) {
- for (String[] platFormInfoArray : platFormInfoList) {
- StreamPushItem streamPushItemForPlatform = new StreamPushItem();
- streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
- if (platFormInfoArray.length > 0) {
- // 数组 platFormInfoArray 0 为平台ID。 1为目录ID
- // 不存在这个平台,则忽略导入此关联关系
- if (platformInfoMap.get(platFormInfoArray[0]) == null
- || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
- logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
- continue;
- }
- streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
- List<GbStream> gbStreamList = platformForEvent.get(platFormInfoArray[0]);
- if (gbStreamList == null) {
- gbStreamList = new ArrayList<>();
- platformForEvent.put(platFormInfoArray[0], gbStreamList);
- }
- // 为发送通知整理数据
- streamPushItemForPlatform.setName(streamPushItem.getName());
- streamPushItemForPlatform.setApp(streamPushItem.getApp());
- streamPushItemForPlatform.setStream(streamPushItem.getStream());
- streamPushItemForPlatform.setGbId(streamPushItem.getGbId());
- gbStreamList.add(streamPushItemForPlatform);
- }
- if (platFormInfoArray.length > 1) {
- streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
- }
- streamPushItemListFroPlatform.add(streamPushItemForPlatform);
- }
- }
- }
- if (!streamPushItemListFroPlatform.isEmpty()) {
- platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform);
- // 发送通知
- for (String platformId : platformForEvent.keySet()) {
- eventPublisher.catalogEventPublishForStream(
- platformId, platformForEvent.get(platformId), CatalogEvent.ADD);
- }
- }
- }
- }
- @Override
- public boolean batchStop(List<GbStream> gbStreams) {
- if (gbStreams == null || gbStreams.size() == 0) {
- return false;
- }
- gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL);
- platformGbStreamMapper.delByGbStreams(gbStreams);
- gbStreamMapper.batchDelForGbStream(gbStreams);
- int delStream = streamPushMapper.delAllForGbStream(gbStreams);
- if (delStream > 0) {
- for (GbStream gbStream : gbStreams) {
- MediaServer mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
- mediaServerService.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
- }
- }
- return true;
- }
- @Override
- public void allStreamOffline() {
- List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGb();
- if (onlinePushers.size() == 0) {
- return;
- }
- streamPushMapper.setAllStreamOffline();
- // 发送通知
- eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
- }
- @Override
- public void offline(List<StreamPushItemFromRedis> offlineStreams) {
- // 更新部分设备离线
- List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams);
- streamPushMapper.offline(offlineStreams);
- // 发送通知
- eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
- }
- @Override
- public void online(List<StreamPushItemFromRedis> onlineStreams) {
- // 更新部分设备上线streamPushService
- List<GbStream> onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams);
- streamPushMapper.online(onlineStreams);
- // 发送通知
- eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON);
- }
- @Override
- public boolean add(StreamPushItem stream) {
- stream.setUpdateTime(DateUtil.getNow());
- stream.setCreateTime(DateUtil.getNow());
- stream.setServerId(userSetting.getServerId());
- stream.setMediaServerId(mediaConfig.getId());
- stream.setSelf(true);
- stream.setPushIng(true);
- // 放在事务内执行
- boolean result = false;
- TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
- try {
- int addStreamResult = streamPushMapper.add(stream);
- if (!ObjectUtils.isEmpty(stream.getGbId())) {
- stream.setStreamType("push");
- gbStreamMapper.add(stream);
- }
- dataSourceTransactionManager.commit(transactionStatus);
- result = true;
- }catch (Exception e) {
- logger.error("批量移除流与平台的关系时错误", e);
- dataSourceTransactionManager.rollback(transactionStatus);
- }
- return result;
- }
- @Override
- public List<String> getAllAppAndStream() {
- return streamPushMapper.getAllAppAndStream();
- }
- @Override
- public ResourceBaseInfo getOverview() {
- int total = streamPushMapper.getAllCount();
- int online = streamPushMapper.getAllOnline(userSetting.isUsePushingAsStatus());
- return new ResourceBaseInfo(total, online);
- }
- @Override
- public Map<String, StreamPushItem> getAllAppAndStreamMap() {
- return streamPushMapper.getAllAppAndStreamMap();
- }
- }
|