StreamPushServiceImpl.java 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson2.JSONObject;
  3. import com.baomidou.dynamic.datasource.annotation.DS;
  4. import com.genersoft.iot.vmp.common.StreamInfo;
  5. import com.genersoft.iot.vmp.conf.MediaConfig;
  6. import com.genersoft.iot.vmp.conf.UserSetting;
  7. import com.genersoft.iot.vmp.gb28181.bean.GbStream;
  8. import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
  9. import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
  10. import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  11. import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
  12. import com.genersoft.iot.vmp.media.service.IMediaServerService;
  13. import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
  14. import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
  15. import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
  16. import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
  17. import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
  18. import com.genersoft.iot.vmp.service.IGbStreamService;
  19. import com.genersoft.iot.vmp.service.IStreamPushService;
  20. import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
  21. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  22. import com.genersoft.iot.vmp.storager.dao.*;
  23. import com.genersoft.iot.vmp.utils.DateUtil;
  24. import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
  25. import com.github.pagehelper.PageHelper;
  26. import com.github.pagehelper.PageInfo;
  27. import org.slf4j.Logger;
  28. import org.slf4j.LoggerFactory;
  29. import org.springframework.beans.factory.annotation.Autowired;
  30. import org.springframework.jdbc.datasource.DataSourceTransactionManager;
  31. import org.springframework.stereotype.Service;
  32. import org.springframework.transaction.TransactionDefinition;
  33. import org.springframework.transaction.TransactionStatus;
  34. import org.springframework.util.ObjectUtils;
  35. import java.util.*;
  36. import java.util.stream.Collectors;
  37. @Service
  38. @DS("master")
  39. public class StreamPushServiceImpl implements IStreamPushService {
  40. private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class);
  41. @Autowired
  42. private GbStreamMapper gbStreamMapper;
  43. @Autowired
  44. private StreamPushMapper streamPushMapper;
  45. @Autowired
  46. private StreamProxyMapper streamProxyMapper;
  47. @Autowired
  48. private ParentPlatformMapper parentPlatformMapper;
  49. @Autowired
  50. private PlatformCatalogMapper platformCatalogMapper;
  51. @Autowired
  52. private PlatformGbStreamMapper platformGbStreamMapper;
  53. @Autowired
  54. private IGbStreamService gbStreamService;
  55. @Autowired
  56. private EventPublisher eventPublisher;
  57. @Autowired
  58. private IRedisCatchStorage redisCatchStorage;
  59. @Autowired
  60. private UserSetting userSetting;
  61. @Autowired
  62. private IMediaServerService mediaServerService;
  63. @Autowired
  64. DataSourceTransactionManager dataSourceTransactionManager;
  65. @Autowired
  66. TransactionDefinition transactionDefinition;
  67. @Autowired
  68. private MediaConfig mediaConfig;
  69. private List<StreamPushItem> handleJSON(List<StreamInfo> streamInfoList) {
  70. if (streamInfoList == null || streamInfoList.isEmpty()) {
  71. return null;
  72. }
  73. Map<String, StreamPushItem> result = new HashMap<>();
  74. for (StreamInfo streamInfo : streamInfoList) {
  75. // 不保存国标推理以及拉流代理的流
  76. if (streamInfo.getOriginType() == OriginType.RTSP_PUSH.ordinal()
  77. || streamInfo.getOriginType() == OriginType.RTMP_PUSH.ordinal()
  78. || streamInfo.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
  79. String key = streamInfo.getApp() + "_" + streamInfo.getStream();
  80. StreamPushItem streamPushItem = result.get(key);
  81. if (streamPushItem == null) {
  82. streamPushItem = streamPushItem.instance(streamInfo);
  83. result.put(key, streamPushItem);
  84. }
  85. }
  86. }
  87. return new ArrayList<>(result.values());
  88. }
  89. @Override
  90. public StreamPushItem transform(OnStreamChangedHookParam item) {
  91. StreamPushItem streamPushItem = new StreamPushItem();
  92. streamPushItem.setApp(item.getApp());
  93. streamPushItem.setMediaServerId(item.getMediaServerId());
  94. streamPushItem.setStream(item.getStream());
  95. streamPushItem.setAliveSecond(item.getAliveSecond());
  96. streamPushItem.setOriginSock(item.getOriginSock());
  97. streamPushItem.setTotalReaderCount(item.getTotalReaderCount() + "");
  98. streamPushItem.setOriginType(item.getOriginType());
  99. streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
  100. streamPushItem.setOriginUrl(item.getOriginUrl());
  101. streamPushItem.setCreateTime(DateUtil.getNow());
  102. streamPushItem.setAliveSecond(item.getAliveSecond());
  103. streamPushItem.setStatus(true);
  104. streamPushItem.setStreamType("push");
  105. streamPushItem.setVhost(item.getVhost());
  106. streamPushItem.setServerId(item.getSeverId());
  107. return streamPushItem;
  108. }
  109. @Override
  110. public PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) {
  111. PageHelper.startPage(page, count);
  112. List<StreamPushItem> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId);
  113. return new PageInfo<>(all);
  114. }
  115. @Override
  116. public List<StreamPushItem> getPushList(String mediaServerId) {
  117. return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
  118. }
  119. @Override
  120. public boolean saveToGB(GbStream stream) {
  121. stream.setStreamType("push");
  122. stream.setStatus(true);
  123. stream.setCreateTime(DateUtil.getNow());
  124. stream.setStreamType("push");
  125. stream.setMediaServerId(mediaConfig.getId());
  126. int add = gbStreamMapper.add(stream);
  127. return add > 0;
  128. }
  129. @Override
  130. public boolean removeFromGB(GbStream stream) {
  131. // 判断是否需要发送事件
  132. gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL);
  133. platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
  134. int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
  135. MediaServer mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
  136. List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaInfo, stream.getApp(), stream.getStream(), null);
  137. if (mediaList != null && mediaList.isEmpty()) {
  138. streamPushMapper.del(stream.getApp(), stream.getStream());
  139. }
  140. return del > 0;
  141. }
  142. @Override
  143. public StreamPushItem getPush(String app, String streamId) {
  144. return streamPushMapper.selectOne(app, streamId);
  145. }
  146. @Override
  147. public boolean stop(String app, String streamId) {
  148. logger.info("[推流 ] 停止流: {}/{}", app, streamId);
  149. StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
  150. if (streamPushItem != null) {
  151. gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
  152. }
  153. platformGbStreamMapper.delByAppAndStream(app, streamId);
  154. gbStreamMapper.del(app, streamId);
  155. int delStream = streamPushMapper.del(app, streamId);
  156. if (delStream > 0) {
  157. MediaServer mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
  158. mediaServerService.closeStreams(mediaServerItem,app, streamId);
  159. }
  160. return true;
  161. }
  162. @Override
  163. public void zlmServerOnline(String mediaServerId) {
  164. // 同步zlm推流信息
  165. MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
  166. if (mediaServerItem == null) {
  167. return;
  168. }
  169. // 数据库记录
  170. List<StreamPushItem> pushList = getPushList(mediaServerId);
  171. Map<String, StreamPushItem> pushItemMap = new HashMap<>();
  172. // redis记录
  173. List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH");
  174. Map<String, OnStreamChangedHookParam> streamInfoPushItemMap = new HashMap<>();
  175. if (pushList.size() > 0) {
  176. for (StreamPushItem streamPushItem : pushList) {
  177. if (ObjectUtils.isEmpty(streamPushItem.getGbId())) {
  178. pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
  179. }
  180. }
  181. }
  182. if (onStreamChangedHookParams.size() > 0) {
  183. for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) {
  184. streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam);
  185. }
  186. }
  187. // 获取所有推流鉴权信息,清理过期的
  188. List<StreamAuthorityInfo> allStreamAuthorityInfo = redisCatchStorage.getAllStreamAuthorityInfo();
  189. Map<String, StreamAuthorityInfo> streamAuthorityInfoInfoMap = new HashMap<>();
  190. for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) {
  191. streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo);
  192. }
  193. List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaServerItem, null, null, null);
  194. if (mediaList == null) {
  195. return;
  196. }
  197. List<StreamPushItem> streamPushItems = handleJSON(mediaList);
  198. if (streamPushItems != null) {
  199. for (StreamPushItem streamPushItem : streamPushItems) {
  200. pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  201. streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  202. streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  203. }
  204. }
  205. List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
  206. if (offlinePushItems.size() > 0) {
  207. String type = "PUSH";
  208. int runLimit = 300;
  209. if (offlinePushItems.size() > runLimit) {
  210. for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
  211. int toIndex = i + runLimit;
  212. if (i + runLimit > offlinePushItems.size()) {
  213. toIndex = offlinePushItems.size();
  214. }
  215. List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
  216. streamPushMapper.delAll(streamPushItemsSub);
  217. }
  218. }else {
  219. streamPushMapper.delAll(offlinePushItems);
  220. }
  221. }
  222. Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values();
  223. if (offlineOnStreamChangedHookParamList.size() > 0) {
  224. String type = "PUSH";
  225. for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) {
  226. JSONObject jsonObject = new JSONObject();
  227. jsonObject.put("serverId", userSetting.getServerId());
  228. jsonObject.put("app", offlineOnStreamChangedHookParam.getApp());
  229. jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream());
  230. jsonObject.put("register", false);
  231. jsonObject.put("mediaServerId", mediaServerId);
  232. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  233. // 移除redis内流的信息
  234. redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream());
  235. // 冗余数据,自己系统中自用
  236. redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId());
  237. }
  238. }
  239. Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values();
  240. if (streamAuthorityInfos.size() > 0) {
  241. for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) {
  242. // 移除redis内流的信息
  243. redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream());
  244. }
  245. }
  246. }
  247. @Override
  248. public void zlmServerOffline(String mediaServerId) {
  249. List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
  250. // 移除没有GBId的推流
  251. streamPushMapper.deleteWithoutGBId(mediaServerId);
  252. gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
  253. // 其他的流设置未启用
  254. streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
  255. streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
  256. // 发送流停止消息
  257. String type = "PUSH";
  258. // 发送redis消息
  259. List<OnStreamChangedHookParam> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
  260. if (streamInfoList.size() > 0) {
  261. for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) {
  262. // 移除redis内流的信息
  263. redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
  264. JSONObject jsonObject = new JSONObject();
  265. jsonObject.put("serverId", userSetting.getServerId());
  266. jsonObject.put("app", onStreamChangedHookParam.getApp());
  267. jsonObject.put("stream", onStreamChangedHookParam.getStream());
  268. jsonObject.put("register", false);
  269. jsonObject.put("mediaServerId", mediaServerId);
  270. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  271. // 冗余数据,自己系统中自用
  272. redisCatchStorage.removePushListItem(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), mediaServerId);
  273. }
  274. }
  275. }
  276. @Override
  277. public void clean() {
  278. }
  279. @Override
  280. public boolean saveToRandomGB() {
  281. List<StreamPushItem> streamPushItems = streamPushMapper.selectAll();
  282. long gbId = 100001;
  283. for (StreamPushItem streamPushItem : streamPushItems) {
  284. streamPushItem.setStreamType("push");
  285. streamPushItem.setStatus(true);
  286. streamPushItem.setGbId("34020000004111" + gbId);
  287. streamPushItem.setCreateTime(DateUtil.getNow());
  288. gbId ++;
  289. }
  290. int limitCount = 30;
  291. if (streamPushItems.size() > limitCount) {
  292. for (int i = 0; i < streamPushItems.size(); i += limitCount) {
  293. int toIndex = i + limitCount;
  294. if (i + limitCount > streamPushItems.size()) {
  295. toIndex = streamPushItems.size();
  296. }
  297. gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex));
  298. }
  299. }else {
  300. gbStreamMapper.batchAdd(streamPushItems);
  301. }
  302. return true;
  303. }
  304. @Override
  305. public void batchAdd(List<StreamPushItem> streamPushItems) {
  306. streamPushMapper.addAll(streamPushItems);
  307. gbStreamMapper.batchAdd(streamPushItems);
  308. }
  309. @Override
  310. public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
  311. // 存储数据到stream_push表
  312. streamPushMapper.addAll(streamPushItems);
  313. List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream()
  314. .filter(streamPushItem-> streamPushItem.getGbId() != null)
  315. .collect(Collectors.toList());
  316. // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里
  317. if (streamPushItemForGbStream.size() > 0) {
  318. gbStreamMapper.batchAdd(streamPushItemForGbStream);
  319. }
  320. // 去除没有ID也就是没有存储到数据库的数据
  321. List<StreamPushItem> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
  322. .filter(streamPushItem-> streamPushItem.getGbStreamId() != null)
  323. .collect(Collectors.toList());
  324. if (streamPushItemsForPlatform.size() > 0) {
  325. // 获取所有平台,平台和目录信息一般不会特别大量。
  326. List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList();
  327. Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>();
  328. if (parentPlatformList.size() == 0) {
  329. return;
  330. }
  331. for (ParentPlatform platform : parentPlatformList) {
  332. Map<String, PlatformCatalog> catalogMap = new HashMap<>();
  333. // 创建根节点
  334. PlatformCatalog platformCatalog = new PlatformCatalog();
  335. platformCatalog.setId(platform.getServerGBId());
  336. catalogMap.put(platform.getServerGBId(), platformCatalog);
  337. // 查询所有节点信息
  338. List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId());
  339. if (platformCatalogs.size() > 0) {
  340. for (PlatformCatalog catalog : platformCatalogs) {
  341. catalogMap.put(catalog.getId(), catalog);
  342. }
  343. }
  344. platformInfoMap.put(platform.getServerGBId(), catalogMap);
  345. }
  346. List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>();
  347. Map<String, List<GbStream>> platformForEvent = new HashMap<>();
  348. // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入
  349. for (StreamPushItem streamPushItem : streamPushItemsForPlatform) {
  350. List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream());
  351. if (platFormInfoList != null && platFormInfoList.size() > 0) {
  352. for (String[] platFormInfoArray : platFormInfoList) {
  353. StreamPushItem streamPushItemForPlatform = new StreamPushItem();
  354. streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
  355. if (platFormInfoArray.length > 0) {
  356. // 数组 platFormInfoArray 0 为平台ID。 1为目录ID
  357. // 不存在这个平台,则忽略导入此关联关系
  358. if (platformInfoMap.get(platFormInfoArray[0]) == null
  359. || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
  360. logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
  361. continue;
  362. }
  363. streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
  364. List<GbStream> gbStreamList = platformForEvent.get(platFormInfoArray[0]);
  365. if (gbStreamList == null) {
  366. gbStreamList = new ArrayList<>();
  367. platformForEvent.put(platFormInfoArray[0], gbStreamList);
  368. }
  369. // 为发送通知整理数据
  370. streamPushItemForPlatform.setName(streamPushItem.getName());
  371. streamPushItemForPlatform.setApp(streamPushItem.getApp());
  372. streamPushItemForPlatform.setStream(streamPushItem.getStream());
  373. streamPushItemForPlatform.setGbId(streamPushItem.getGbId());
  374. gbStreamList.add(streamPushItemForPlatform);
  375. }
  376. if (platFormInfoArray.length > 1) {
  377. streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
  378. }
  379. streamPushItemListFroPlatform.add(streamPushItemForPlatform);
  380. }
  381. }
  382. }
  383. if (!streamPushItemListFroPlatform.isEmpty()) {
  384. platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform);
  385. // 发送通知
  386. for (String platformId : platformForEvent.keySet()) {
  387. eventPublisher.catalogEventPublishForStream(
  388. platformId, platformForEvent.get(platformId), CatalogEvent.ADD);
  389. }
  390. }
  391. }
  392. }
  393. @Override
  394. public boolean batchStop(List<GbStream> gbStreams) {
  395. if (gbStreams == null || gbStreams.size() == 0) {
  396. return false;
  397. }
  398. gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL);
  399. platformGbStreamMapper.delByGbStreams(gbStreams);
  400. gbStreamMapper.batchDelForGbStream(gbStreams);
  401. int delStream = streamPushMapper.delAllForGbStream(gbStreams);
  402. if (delStream > 0) {
  403. for (GbStream gbStream : gbStreams) {
  404. MediaServer mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
  405. mediaServerService.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
  406. }
  407. }
  408. return true;
  409. }
  410. @Override
  411. public void allStreamOffline() {
  412. List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGb();
  413. if (onlinePushers.size() == 0) {
  414. return;
  415. }
  416. streamPushMapper.setAllStreamOffline();
  417. // 发送通知
  418. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
  419. }
  420. @Override
  421. public void offline(List<StreamPushItemFromRedis> offlineStreams) {
  422. // 更新部分设备离线
  423. List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams);
  424. streamPushMapper.offline(offlineStreams);
  425. // 发送通知
  426. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
  427. }
  428. @Override
  429. public void online(List<StreamPushItemFromRedis> onlineStreams) {
  430. // 更新部分设备上线streamPushService
  431. List<GbStream> onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams);
  432. streamPushMapper.online(onlineStreams);
  433. // 发送通知
  434. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON);
  435. }
  436. @Override
  437. public boolean add(StreamPushItem stream) {
  438. stream.setUpdateTime(DateUtil.getNow());
  439. stream.setCreateTime(DateUtil.getNow());
  440. stream.setServerId(userSetting.getServerId());
  441. stream.setMediaServerId(mediaConfig.getId());
  442. stream.setSelf(true);
  443. stream.setPushIng(true);
  444. // 放在事务内执行
  445. boolean result = false;
  446. TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
  447. try {
  448. int addStreamResult = streamPushMapper.add(stream);
  449. if (!ObjectUtils.isEmpty(stream.getGbId())) {
  450. stream.setStreamType("push");
  451. gbStreamMapper.add(stream);
  452. }
  453. dataSourceTransactionManager.commit(transactionStatus);
  454. result = true;
  455. }catch (Exception e) {
  456. logger.error("批量移除流与平台的关系时错误", e);
  457. dataSourceTransactionManager.rollback(transactionStatus);
  458. }
  459. return result;
  460. }
  461. @Override
  462. public List<String> getAllAppAndStream() {
  463. return streamPushMapper.getAllAppAndStream();
  464. }
  465. @Override
  466. public ResourceBaseInfo getOverview() {
  467. int total = streamPushMapper.getAllCount();
  468. int online = streamPushMapper.getAllOnline(userSetting.isUsePushingAsStatus());
  469. return new ResourceBaseInfo(total, online);
  470. }
  471. @Override
  472. public Map<String, StreamPushItem> getAllAppAndStreamMap() {
  473. return streamPushMapper.getAllAppAndStreamMap();
  474. }
  475. }