StreamPushServiceImpl.java 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.alibaba.fastjson2.JSONArray;
  4. import com.alibaba.fastjson2.JSONObject;
  5. import com.alibaba.fastjson2.TypeReference;
  6. import com.baomidou.dynamic.datasource.annotation.DS;
  7. import com.genersoft.iot.vmp.conf.MediaConfig;
  8. import com.genersoft.iot.vmp.conf.UserSetting;
  9. import com.genersoft.iot.vmp.gb28181.bean.*;
  10. import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  11. import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
  12. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  13. import com.genersoft.iot.vmp.media.zlm.dto.*;
  14. import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
  15. import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
  16. import com.genersoft.iot.vmp.service.IGbStreamService;
  17. import com.genersoft.iot.vmp.media.service.IMediaServerService;
  18. import com.genersoft.iot.vmp.service.IStreamPushService;
  19. import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
  20. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  21. import com.genersoft.iot.vmp.storager.dao.*;
  22. import com.genersoft.iot.vmp.utils.DateUtil;
  23. import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
  24. import com.github.pagehelper.PageHelper;
  25. import com.github.pagehelper.PageInfo;
  26. import org.slf4j.Logger;
  27. import org.slf4j.LoggerFactory;
  28. import org.springframework.beans.factory.annotation.Autowired;
  29. import org.springframework.jdbc.datasource.DataSourceTransactionManager;
  30. import org.springframework.stereotype.Service;
  31. import org.springframework.transaction.TransactionDefinition;
  32. import org.springframework.transaction.TransactionStatus;
  33. import org.springframework.util.ObjectUtils;
  34. import java.util.*;
  35. import java.util.stream.Collectors;
  36. @Service
  37. @DS("master")
  38. public class StreamPushServiceImpl implements IStreamPushService {
  39. private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class);
  40. @Autowired
  41. private GbStreamMapper gbStreamMapper;
  42. @Autowired
  43. private StreamPushMapper streamPushMapper;
  44. @Autowired
  45. private StreamProxyMapper streamProxyMapper;
  46. @Autowired
  47. private ParentPlatformMapper parentPlatformMapper;
  48. @Autowired
  49. private PlatformCatalogMapper platformCatalogMapper;
  50. @Autowired
  51. private PlatformGbStreamMapper platformGbStreamMapper;
  52. @Autowired
  53. private IGbStreamService gbStreamService;
  54. @Autowired
  55. private EventPublisher eventPublisher;
  56. @Autowired
  57. private ZLMRESTfulUtils zlmresTfulUtils;
  58. @Autowired
  59. private IRedisCatchStorage redisCatchStorage;
  60. @Autowired
  61. private UserSetting userSetting;
  62. @Autowired
  63. private IMediaServerService mediaServerService;
  64. @Autowired
  65. DataSourceTransactionManager dataSourceTransactionManager;
  66. @Autowired
  67. TransactionDefinition transactionDefinition;
  68. @Autowired
  69. private MediaConfig mediaConfig;
  70. @Override
  71. public List<StreamPushItem> handleJSON(String jsonData, MediaServer mediaServerItem) {
  72. if (jsonData == null) {
  73. return null;
  74. }
  75. Map<String, StreamPushItem> result = new HashMap<>();
  76. List<OnStreamChangedHookParam> onStreamChangedHookParams = JSON.parseObject(jsonData, new TypeReference<List<OnStreamChangedHookParam>>() {});
  77. for (OnStreamChangedHookParam item : onStreamChangedHookParams) {
  78. // 不保存国标推理以及拉流代理的流
  79. if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
  80. || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
  81. || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
  82. String key = item.getApp() + "_" + item.getStream();
  83. StreamPushItem streamPushItem = result.get(key);
  84. if (streamPushItem == null) {
  85. streamPushItem = transform(item);
  86. result.put(key, streamPushItem);
  87. }
  88. }
  89. }
  90. return new ArrayList<>(result.values());
  91. }
  92. @Override
  93. public StreamPushItem transform(OnStreamChangedHookParam item) {
  94. StreamPushItem streamPushItem = new StreamPushItem();
  95. streamPushItem.setApp(item.getApp());
  96. streamPushItem.setMediaServerId(item.getMediaServerId());
  97. streamPushItem.setStream(item.getStream());
  98. streamPushItem.setAliveSecond(item.getAliveSecond());
  99. streamPushItem.setOriginSock(item.getOriginSock());
  100. streamPushItem.setTotalReaderCount(item.getTotalReaderCount() + "");
  101. streamPushItem.setOriginType(item.getOriginType());
  102. streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
  103. streamPushItem.setOriginUrl(item.getOriginUrl());
  104. streamPushItem.setCreateTime(DateUtil.getNow());
  105. streamPushItem.setAliveSecond(item.getAliveSecond());
  106. streamPushItem.setStatus(true);
  107. streamPushItem.setStreamType("push");
  108. streamPushItem.setVhost(item.getVhost());
  109. streamPushItem.setServerId(item.getSeverId());
  110. return streamPushItem;
  111. }
  112. @Override
  113. public PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) {
  114. PageHelper.startPage(page, count);
  115. List<StreamPushItem> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId);
  116. return new PageInfo<>(all);
  117. }
  118. @Override
  119. public List<StreamPushItem> getPushList(String mediaServerId) {
  120. return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
  121. }
  122. @Override
  123. public boolean saveToGB(GbStream stream) {
  124. stream.setStreamType("push");
  125. stream.setStatus(true);
  126. stream.setCreateTime(DateUtil.getNow());
  127. stream.setStreamType("push");
  128. stream.setMediaServerId(mediaConfig.getId());
  129. int add = gbStreamMapper.add(stream);
  130. return add > 0;
  131. }
  132. @Override
  133. public boolean removeFromGB(GbStream stream) {
  134. // 判断是否需要发送事件
  135. gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL);
  136. platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
  137. int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
  138. MediaServer mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
  139. JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream());
  140. if (mediaList != null) {
  141. if (mediaList.getInteger("code") == 0) {
  142. JSONArray data = mediaList.getJSONArray("data");
  143. if (data == null) {
  144. streamPushMapper.del(stream.getApp(), stream.getStream());
  145. }
  146. }
  147. }
  148. return del > 0;
  149. }
  150. @Override
  151. public StreamPushItem getPush(String app, String streamId) {
  152. return streamPushMapper.selectOne(app, streamId);
  153. }
  154. @Override
  155. public boolean stop(String app, String streamId) {
  156. logger.info("[推流 ] 停止流: {}/{}", app, streamId);
  157. StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
  158. if (streamPushItem != null) {
  159. gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
  160. }
  161. platformGbStreamMapper.delByAppAndStream(app, streamId);
  162. gbStreamMapper.del(app, streamId);
  163. int delStream = streamPushMapper.del(app, streamId);
  164. if (delStream > 0) {
  165. MediaServer mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
  166. zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId);
  167. }
  168. return true;
  169. }
  170. @Override
  171. public void zlmServerOnline(String mediaServerId) {
  172. // 同步zlm推流信息
  173. MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
  174. if (mediaServerItem == null) {
  175. return;
  176. }
  177. // 数据库记录
  178. List<StreamPushItem> pushList = getPushList(mediaServerId);
  179. Map<String, StreamPushItem> pushItemMap = new HashMap<>();
  180. // redis记录
  181. List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH");
  182. Map<String, OnStreamChangedHookParam> streamInfoPushItemMap = new HashMap<>();
  183. if (pushList.size() > 0) {
  184. for (StreamPushItem streamPushItem : pushList) {
  185. if (ObjectUtils.isEmpty(streamPushItem.getGbId())) {
  186. pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
  187. }
  188. }
  189. }
  190. if (onStreamChangedHookParams.size() > 0) {
  191. for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) {
  192. streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam);
  193. }
  194. }
  195. // 获取所有推流鉴权信息,清理过期的
  196. List<StreamAuthorityInfo> allStreamAuthorityInfo = redisCatchStorage.getAllStreamAuthorityInfo();
  197. Map<String, StreamAuthorityInfo> streamAuthorityInfoInfoMap = new HashMap<>();
  198. for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) {
  199. streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo);
  200. }
  201. zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
  202. if (mediaList == null) {
  203. return;
  204. }
  205. String dataStr = mediaList.getString("data");
  206. Integer code = mediaList.getInteger("code");
  207. List<StreamPushItem> streamPushItems = null;
  208. if (code == 0 ) {
  209. if (dataStr != null) {
  210. streamPushItems = handleJSON(dataStr, mediaServerItem);
  211. }
  212. }
  213. if (streamPushItems != null) {
  214. for (StreamPushItem streamPushItem : streamPushItems) {
  215. pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  216. streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  217. streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  218. }
  219. }
  220. List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
  221. if (offlinePushItems.size() > 0) {
  222. String type = "PUSH";
  223. int runLimit = 300;
  224. if (offlinePushItems.size() > runLimit) {
  225. for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
  226. int toIndex = i + runLimit;
  227. if (i + runLimit > offlinePushItems.size()) {
  228. toIndex = offlinePushItems.size();
  229. }
  230. List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
  231. streamPushMapper.delAll(streamPushItemsSub);
  232. }
  233. }else {
  234. streamPushMapper.delAll(offlinePushItems);
  235. }
  236. }
  237. Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values();
  238. if (offlineOnStreamChangedHookParamList.size() > 0) {
  239. String type = "PUSH";
  240. for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) {
  241. JSONObject jsonObject = new JSONObject();
  242. jsonObject.put("serverId", userSetting.getServerId());
  243. jsonObject.put("app", offlineOnStreamChangedHookParam.getApp());
  244. jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream());
  245. jsonObject.put("register", false);
  246. jsonObject.put("mediaServerId", mediaServerId);
  247. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  248. // 移除redis内流的信息
  249. redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream());
  250. // 冗余数据,自己系统中自用
  251. redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId());
  252. }
  253. }
  254. Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values();
  255. if (streamAuthorityInfos.size() > 0) {
  256. for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) {
  257. // 移除redis内流的信息
  258. redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream());
  259. }
  260. }
  261. }));
  262. }
  263. @Override
  264. public void zlmServerOffline(String mediaServerId) {
  265. List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
  266. // 移除没有GBId的推流
  267. streamPushMapper.deleteWithoutGBId(mediaServerId);
  268. gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
  269. // 其他的流设置未启用
  270. streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
  271. streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
  272. // 发送流停止消息
  273. String type = "PUSH";
  274. // 发送redis消息
  275. List<OnStreamChangedHookParam> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
  276. if (streamInfoList.size() > 0) {
  277. for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) {
  278. // 移除redis内流的信息
  279. redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
  280. JSONObject jsonObject = new JSONObject();
  281. jsonObject.put("serverId", userSetting.getServerId());
  282. jsonObject.put("app", onStreamChangedHookParam.getApp());
  283. jsonObject.put("stream", onStreamChangedHookParam.getStream());
  284. jsonObject.put("register", false);
  285. jsonObject.put("mediaServerId", mediaServerId);
  286. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  287. // 冗余数据,自己系统中自用
  288. redisCatchStorage.removePushListItem(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), mediaServerId);
  289. }
  290. }
  291. }
  292. @Override
  293. public void clean() {
  294. }
  295. @Override
  296. public boolean saveToRandomGB() {
  297. List<StreamPushItem> streamPushItems = streamPushMapper.selectAll();
  298. long gbId = 100001;
  299. for (StreamPushItem streamPushItem : streamPushItems) {
  300. streamPushItem.setStreamType("push");
  301. streamPushItem.setStatus(true);
  302. streamPushItem.setGbId("34020000004111" + gbId);
  303. streamPushItem.setCreateTime(DateUtil.getNow());
  304. gbId ++;
  305. }
  306. int limitCount = 30;
  307. if (streamPushItems.size() > limitCount) {
  308. for (int i = 0; i < streamPushItems.size(); i += limitCount) {
  309. int toIndex = i + limitCount;
  310. if (i + limitCount > streamPushItems.size()) {
  311. toIndex = streamPushItems.size();
  312. }
  313. gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex));
  314. }
  315. }else {
  316. gbStreamMapper.batchAdd(streamPushItems);
  317. }
  318. return true;
  319. }
  320. @Override
  321. public void batchAdd(List<StreamPushItem> streamPushItems) {
  322. streamPushMapper.addAll(streamPushItems);
  323. gbStreamMapper.batchAdd(streamPushItems);
  324. }
  325. @Override
  326. public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
  327. // 存储数据到stream_push表
  328. streamPushMapper.addAll(streamPushItems);
  329. List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream()
  330. .filter(streamPushItem-> streamPushItem.getGbId() != null)
  331. .collect(Collectors.toList());
  332. // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里
  333. if (streamPushItemForGbStream.size() > 0) {
  334. gbStreamMapper.batchAdd(streamPushItemForGbStream);
  335. }
  336. // 去除没有ID也就是没有存储到数据库的数据
  337. List<StreamPushItem> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
  338. .filter(streamPushItem-> streamPushItem.getGbStreamId() != null)
  339. .collect(Collectors.toList());
  340. if (streamPushItemsForPlatform.size() > 0) {
  341. // 获取所有平台,平台和目录信息一般不会特别大量。
  342. List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList();
  343. Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>();
  344. if (parentPlatformList.size() == 0) {
  345. return;
  346. }
  347. for (ParentPlatform platform : parentPlatformList) {
  348. Map<String, PlatformCatalog> catalogMap = new HashMap<>();
  349. // 创建根节点
  350. PlatformCatalog platformCatalog = new PlatformCatalog();
  351. platformCatalog.setId(platform.getServerGBId());
  352. catalogMap.put(platform.getServerGBId(), platformCatalog);
  353. // 查询所有节点信息
  354. List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId());
  355. if (platformCatalogs.size() > 0) {
  356. for (PlatformCatalog catalog : platformCatalogs) {
  357. catalogMap.put(catalog.getId(), catalog);
  358. }
  359. }
  360. platformInfoMap.put(platform.getServerGBId(), catalogMap);
  361. }
  362. List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>();
  363. Map<String, List<GbStream>> platformForEvent = new HashMap<>();
  364. // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入
  365. for (StreamPushItem streamPushItem : streamPushItemsForPlatform) {
  366. List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream());
  367. if (platFormInfoList != null && platFormInfoList.size() > 0) {
  368. for (String[] platFormInfoArray : platFormInfoList) {
  369. StreamPushItem streamPushItemForPlatform = new StreamPushItem();
  370. streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
  371. if (platFormInfoArray.length > 0) {
  372. // 数组 platFormInfoArray 0 为平台ID。 1为目录ID
  373. // 不存在这个平台,则忽略导入此关联关系
  374. if (platformInfoMap.get(platFormInfoArray[0]) == null
  375. || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
  376. logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
  377. continue;
  378. }
  379. streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
  380. List<GbStream> gbStreamList = platformForEvent.get(platFormInfoArray[0]);
  381. if (gbStreamList == null) {
  382. gbStreamList = new ArrayList<>();
  383. platformForEvent.put(platFormInfoArray[0], gbStreamList);
  384. }
  385. // 为发送通知整理数据
  386. streamPushItemForPlatform.setName(streamPushItem.getName());
  387. streamPushItemForPlatform.setApp(streamPushItem.getApp());
  388. streamPushItemForPlatform.setStream(streamPushItem.getStream());
  389. streamPushItemForPlatform.setGbId(streamPushItem.getGbId());
  390. gbStreamList.add(streamPushItemForPlatform);
  391. }
  392. if (platFormInfoArray.length > 1) {
  393. streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
  394. }
  395. streamPushItemListFroPlatform.add(streamPushItemForPlatform);
  396. }
  397. }
  398. }
  399. if (!streamPushItemListFroPlatform.isEmpty()) {
  400. platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform);
  401. // 发送通知
  402. for (String platformId : platformForEvent.keySet()) {
  403. eventPublisher.catalogEventPublishForStream(
  404. platformId, platformForEvent.get(platformId), CatalogEvent.ADD);
  405. }
  406. }
  407. }
  408. }
  409. @Override
  410. public boolean batchStop(List<GbStream> gbStreams) {
  411. if (gbStreams == null || gbStreams.size() == 0) {
  412. return false;
  413. }
  414. gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL);
  415. platformGbStreamMapper.delByGbStreams(gbStreams);
  416. gbStreamMapper.batchDelForGbStream(gbStreams);
  417. int delStream = streamPushMapper.delAllForGbStream(gbStreams);
  418. if (delStream > 0) {
  419. for (GbStream gbStream : gbStreams) {
  420. MediaServer mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
  421. zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
  422. }
  423. }
  424. return true;
  425. }
  426. @Override
  427. public void allStreamOffline() {
  428. List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGb();
  429. if (onlinePushers.size() == 0) {
  430. return;
  431. }
  432. streamPushMapper.setAllStreamOffline();
  433. // 发送通知
  434. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
  435. }
  436. @Override
  437. public void offline(List<StreamPushItemFromRedis> offlineStreams) {
  438. // 更新部分设备离线
  439. List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams);
  440. streamPushMapper.offline(offlineStreams);
  441. // 发送通知
  442. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
  443. }
  444. @Override
  445. public void online(List<StreamPushItemFromRedis> onlineStreams) {
  446. // 更新部分设备上线streamPushService
  447. List<GbStream> onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams);
  448. streamPushMapper.online(onlineStreams);
  449. // 发送通知
  450. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON);
  451. }
  452. @Override
  453. public boolean add(StreamPushItem stream) {
  454. stream.setUpdateTime(DateUtil.getNow());
  455. stream.setCreateTime(DateUtil.getNow());
  456. stream.setServerId(userSetting.getServerId());
  457. stream.setMediaServerId(mediaConfig.getId());
  458. stream.setSelf(true);
  459. stream.setPushIng(true);
  460. // 放在事务内执行
  461. boolean result = false;
  462. TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
  463. try {
  464. int addStreamResult = streamPushMapper.add(stream);
  465. if (!ObjectUtils.isEmpty(stream.getGbId())) {
  466. stream.setStreamType("push");
  467. gbStreamMapper.add(stream);
  468. }
  469. dataSourceTransactionManager.commit(transactionStatus);
  470. result = true;
  471. }catch (Exception e) {
  472. logger.error("批量移除流与平台的关系时错误", e);
  473. dataSourceTransactionManager.rollback(transactionStatus);
  474. }
  475. return result;
  476. }
  477. @Override
  478. public List<String> getAllAppAndStream() {
  479. return streamPushMapper.getAllAppAndStream();
  480. }
  481. @Override
  482. public ResourceBaseInfo getOverview() {
  483. int total = streamPushMapper.getAllCount();
  484. int online = streamPushMapper.getAllOnline(userSetting.isUsePushingAsStatus());
  485. return new ResourceBaseInfo(total, online);
  486. }
  487. @Override
  488. public Map<String, StreamPushItem> getAllAppAndStreamMap() {
  489. return streamPushMapper.getAllAppAndStreamMap();
  490. }
  491. }