StreamPushServiceImpl.java 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.alibaba.fastjson.TypeReference;
  6. import com.genersoft.iot.vmp.common.StreamInfo;
  7. import com.genersoft.iot.vmp.conf.UserSetup;
  8. import com.genersoft.iot.vmp.gb28181.bean.*;
  9. import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  10. import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
  11. import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
  12. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  13. import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
  14. import com.genersoft.iot.vmp.media.zlm.dto.*;
  15. import com.genersoft.iot.vmp.service.IGbStreamService;
  16. import com.genersoft.iot.vmp.service.IMediaServerService;
  17. import com.genersoft.iot.vmp.service.IStreamPushService;
  18. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  19. import com.genersoft.iot.vmp.storager.dao.*;
  20. import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
  21. import com.github.pagehelper.PageHelper;
  22. import com.github.pagehelper.PageInfo;
  23. import org.slf4j.Logger;
  24. import org.slf4j.LoggerFactory;
  25. import org.springframework.beans.factory.annotation.Autowired;
  26. import org.springframework.stereotype.Service;
  27. import org.springframework.util.StringUtils;
  28. import java.util.*;
  29. import java.util.stream.Collectors;
  30. @Service
  31. public class StreamPushServiceImpl implements IStreamPushService {
  32. private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class);
  33. @Autowired
  34. private GbStreamMapper gbStreamMapper;
  35. @Autowired
  36. private StreamPushMapper streamPushMapper;
  37. @Autowired
  38. private ParentPlatformMapper parentPlatformMapper;
  39. @Autowired
  40. private PlatformCatalogMapper platformCatalogMapper;
  41. @Autowired
  42. private PlatformGbStreamMapper platformGbStreamMapper;
  43. @Autowired
  44. private IGbStreamService gbStreamService;
  45. @Autowired
  46. private EventPublisher eventPublisher;
  47. @Autowired
  48. private ZLMRESTfulUtils zlmresTfulUtils;
  49. @Autowired
  50. private IRedisCatchStorage redisCatchStorage;
  51. @Autowired
  52. private UserSetup userSetup;
  53. @Autowired
  54. private IMediaServerService mediaServerService;
  55. @Override
  56. public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
  57. if (jsonData == null) return null;
  58. Map<String, StreamPushItem> result = new HashMap<>();
  59. List<MediaItem> mediaItems = JSON.parseObject(jsonData, new TypeReference<List<MediaItem>>() {});
  60. for (MediaItem item : mediaItems) {
  61. // 不保存国标推理以及拉流代理的流
  62. if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
  63. || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
  64. || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
  65. String key = item.getApp() + "_" + item.getStream();
  66. StreamPushItem streamPushItem = result.get(key);
  67. if (streamPushItem == null) {
  68. streamPushItem = transform(item);
  69. result.put(key, streamPushItem);
  70. }
  71. }
  72. }
  73. return new ArrayList<>(result.values());
  74. }
  75. @Override
  76. public StreamPushItem transform(MediaItem item) {
  77. StreamPushItem streamPushItem = new StreamPushItem();
  78. streamPushItem.setApp(item.getApp());
  79. streamPushItem.setMediaServerId(item.getMediaServerId());
  80. streamPushItem.setStream(item.getStream());
  81. streamPushItem.setAliveSecond(item.getAliveSecond());
  82. streamPushItem.setOriginSock(item.getOriginSock());
  83. streamPushItem.setTotalReaderCount(item.getTotalReaderCount());
  84. streamPushItem.setOriginType(item.getOriginType());
  85. streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
  86. streamPushItem.setOriginUrl(item.getOriginUrl());
  87. streamPushItem.setCreateStamp(item.getCreateStamp() * 1000);
  88. streamPushItem.setAliveSecond(item.getAliveSecond());
  89. streamPushItem.setStatus(true);
  90. streamPushItem.setStreamType("push");
  91. streamPushItem.setVhost(item.getVhost());
  92. return streamPushItem;
  93. }
  94. @Override
  95. public PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) {
  96. PageHelper.startPage(page, count);
  97. List<StreamPushItem> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId);
  98. return new PageInfo<>(all);
  99. }
  100. @Override
  101. public List<StreamPushItem> getPushList(String mediaServerId) {
  102. return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
  103. }
  104. @Override
  105. public boolean saveToGB(GbStream stream) {
  106. stream.setStreamType("push");
  107. stream.setStatus(true);
  108. stream.setCreateStamp(System.currentTimeMillis());
  109. int add = gbStreamMapper.add(stream);
  110. // 查找开启了全部直播流共享的上级平台
  111. List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
  112. if (parentPlatforms.size() > 0) {
  113. for (ParentPlatform parentPlatform : parentPlatforms) {
  114. stream.setCatalogId(parentPlatform.getCatalogId());
  115. stream.setPlatformId(parentPlatform.getServerGBId());
  116. String streamId = stream.getStream();
  117. StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId());
  118. if (streamProxyItem == null) {
  119. platformGbStreamMapper.add(stream);
  120. eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
  121. }else {
  122. if (!streamProxyItem.getGbId().equals(stream.getGbId())) {
  123. // 此流使用另一个国标Id已经与该平台关联,移除此记录
  124. platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId());
  125. platformGbStreamMapper.add(stream);
  126. eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
  127. }
  128. }
  129. }
  130. }
  131. return add > 0;
  132. }
  133. @Override
  134. public boolean removeFromGB(GbStream stream) {
  135. // 判断是否需要发送事件
  136. gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL);
  137. platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
  138. int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
  139. MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
  140. JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream());
  141. if (mediaList != null) {
  142. if (mediaList.getInteger("code") == 0) {
  143. JSONArray data = mediaList.getJSONArray("data");
  144. if (data == null) {
  145. streamPushMapper.del(stream.getApp(), stream.getStream());
  146. }
  147. }
  148. }
  149. return del > 0;
  150. }
  151. @Override
  152. public StreamPushItem getPush(String app, String streamId) {
  153. return streamPushMapper.selectOne(app, streamId);
  154. }
  155. @Override
  156. public boolean stop(String app, String streamId) {
  157. StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
  158. gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
  159. platformGbStreamMapper.delByAppAndStream(app, streamId);
  160. gbStreamMapper.del(app, streamId);
  161. int delStream = streamPushMapper.del(app, streamId);
  162. if (delStream > 0) {
  163. MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
  164. zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId);
  165. }
  166. return true;
  167. }
  168. @Override
  169. public void zlmServerOnline(String mediaServerId) {
  170. // 同步zlm推流信息
  171. MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  172. if (mediaServerItem == null) {
  173. return;
  174. }
  175. // 数据库记录
  176. List<StreamPushItem> pushList = getPushList(mediaServerId);
  177. Map<String, StreamPushItem> pushItemMap = new HashMap<>();
  178. // redis记录
  179. List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, "PUSH");
  180. Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>();
  181. if (pushList.size() > 0) {
  182. for (StreamPushItem streamPushItem : pushList) {
  183. if (StringUtils.isEmpty(streamPushItem.getGbId())) {
  184. pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
  185. }
  186. }
  187. }
  188. if (mediaItems.size() > 0) {
  189. for (MediaItem mediaItem : mediaItems) {
  190. streamInfoPushItemMap.put(mediaItem.getApp() + mediaItem.getStream(), mediaItem);
  191. }
  192. }
  193. zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
  194. if (mediaList == null) return;
  195. String dataStr = mediaList.getString("data");
  196. Integer code = mediaList.getInteger("code");
  197. List<StreamPushItem> streamPushItems = null;
  198. if (code == 0 ) {
  199. if (dataStr != null) {
  200. streamPushItems = handleJSON(dataStr, mediaServerItem);
  201. }
  202. }
  203. if (streamPushItems != null) {
  204. for (StreamPushItem streamPushItem : streamPushItems) {
  205. pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  206. streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  207. }
  208. }
  209. List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
  210. if (offlinePushItems.size() > 0) {
  211. String type = "PUSH";
  212. int runLimit = 300;
  213. if (offlinePushItems.size() > runLimit) {
  214. for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
  215. int toIndex = i + runLimit;
  216. if (i + runLimit > offlinePushItems.size()) {
  217. toIndex = offlinePushItems.size();
  218. }
  219. List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
  220. streamPushMapper.delAll(streamPushItemsSub);
  221. }
  222. }else {
  223. streamPushMapper.delAll(offlinePushItems);
  224. }
  225. }
  226. Collection<MediaItem> offlineMediaItemList = streamInfoPushItemMap.values();
  227. if (offlineMediaItemList.size() > 0) {
  228. String type = "PUSH";
  229. for (MediaItem offlineMediaItem : offlineMediaItemList) {
  230. JSONObject jsonObject = new JSONObject();
  231. jsonObject.put("serverId", userSetup.getServerId());
  232. jsonObject.put("app", offlineMediaItem.getApp());
  233. jsonObject.put("stream", offlineMediaItem.getStream());
  234. jsonObject.put("register", false);
  235. jsonObject.put("mediaServerId", mediaServerId);
  236. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  237. // 移除redis内流的信息
  238. redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineMediaItem.getApp(), offlineMediaItem.getStream());
  239. }
  240. }
  241. }));
  242. }
  243. @Override
  244. public void zlmServerOffline(String mediaServerId) {
  245. List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
  246. // 移除没有GBId的推流
  247. streamPushMapper.deleteWithoutGBId(mediaServerId);
  248. gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
  249. // 其他的流设置未启用
  250. gbStreamMapper.updateStatusByMediaServerId(mediaServerId, false);
  251. // 发送流停止消息
  252. String type = "PUSH";
  253. // 发送redis消息
  254. List<MediaItem> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
  255. if (streamInfoList.size() > 0) {
  256. for (MediaItem mediaItem : streamInfoList) {
  257. // 移除redis内流的信息
  258. redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream());
  259. JSONObject jsonObject = new JSONObject();
  260. jsonObject.put("serverId", userSetup.getServerId());
  261. jsonObject.put("app", mediaItem.getApp());
  262. jsonObject.put("stream", mediaItem.getStream());
  263. jsonObject.put("register", false);
  264. jsonObject.put("mediaServerId", mediaServerId);
  265. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  266. }
  267. }
  268. }
  269. @Override
  270. public void clean() {
  271. }
  272. @Override
  273. public boolean saveToRandomGB() {
  274. List<StreamPushItem> streamPushItems = streamPushMapper.selectAll();
  275. long gbId = 100001;
  276. for (StreamPushItem streamPushItem : streamPushItems) {
  277. streamPushItem.setStreamType("push");
  278. streamPushItem.setStatus(true);
  279. streamPushItem.setGbId("34020000004111" + gbId);
  280. streamPushItem.setCreateStamp(System.currentTimeMillis());
  281. gbId ++;
  282. }
  283. int limitCount = 30;
  284. if (streamPushItems.size() > limitCount) {
  285. for (int i = 0; i < streamPushItems.size(); i += limitCount) {
  286. int toIndex = i + limitCount;
  287. if (i + limitCount > streamPushItems.size()) {
  288. toIndex = streamPushItems.size();
  289. }
  290. gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex));
  291. }
  292. }else {
  293. gbStreamMapper.batchAdd(streamPushItems);
  294. }
  295. return true;
  296. }
  297. @Override
  298. public void batchAdd(List<StreamPushItem> streamPushItems) {
  299. streamPushMapper.addAll(streamPushItems);
  300. gbStreamMapper.batchAdd(streamPushItems);
  301. // 查找开启了全部直播流共享的上级平台
  302. List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
  303. if (parentPlatforms.size() > 0) {
  304. for (StreamPushItem stream : streamPushItems) {
  305. for (ParentPlatform parentPlatform : parentPlatforms) {
  306. stream.setCatalogId(parentPlatform.getCatalogId());
  307. stream.setPlatformId(parentPlatform.getServerGBId());
  308. String streamId = stream.getStream();
  309. StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId());
  310. if (streamProxyItem == null) {
  311. platformGbStreamMapper.add(stream);
  312. eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
  313. }else {
  314. if (!streamProxyItem.getGbId().equals(stream.getGbId())) {
  315. // 此流使用另一个国标Id已经与该平台关联,移除此记录
  316. platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId());
  317. platformGbStreamMapper.add(stream);
  318. eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
  319. stream.setGbId(streamProxyItem.getGbId());
  320. eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.DEL);
  321. }
  322. }
  323. }
  324. }
  325. }
  326. }
  327. @Override
  328. public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
  329. // 存储数据到stream_push表
  330. streamPushMapper.addAll(streamPushItems);
  331. List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream()
  332. .filter(streamPushItem-> streamPushItem.getId() != null)
  333. .collect(Collectors.toList());
  334. // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里
  335. if (streamPushItemForGbStream.size() > 0) {
  336. gbStreamMapper.batchAdd(streamPushItemForGbStream);
  337. }
  338. // 去除没有ID也就是没有存储到数据库的数据
  339. List<StreamPushItem> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
  340. .filter(streamPushItem-> streamPushItem.getGbStreamId() != null)
  341. .collect(Collectors.toList());
  342. if (streamPushItemsForPlatform.size() > 0) {
  343. // 获取所有平台,平台和目录信息一般不会特别大量。
  344. List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList();
  345. Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>();
  346. if (parentPlatformList.size() == 0) {
  347. return;
  348. }
  349. for (ParentPlatform platform : parentPlatformList) {
  350. Map<String, PlatformCatalog> catalogMap = new HashMap<>();
  351. // 创建根节点
  352. PlatformCatalog platformCatalog = new PlatformCatalog();
  353. platformCatalog.setId(platform.getServerGBId());
  354. catalogMap.put(platform.getServerGBId(), platformCatalog);
  355. // 查询所有节点信息
  356. List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId());
  357. if (platformCatalogs.size() > 0) {
  358. for (PlatformCatalog catalog : platformCatalogs) {
  359. catalogMap.put(catalog.getId(), catalog);
  360. }
  361. }
  362. platformInfoMap.put(platform.getServerGBId(), catalogMap);
  363. }
  364. List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>();
  365. Map<String, List<GbStream>> platformForEvent = new HashMap<>();
  366. // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入
  367. for (StreamPushItem streamPushItem : streamPushItemsForPlatform) {
  368. List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream());
  369. if (platFormInfoList != null && platFormInfoList.size() > 0) {
  370. for (String[] platFormInfoArray : platFormInfoList) {
  371. StreamPushItem streamPushItemForPlatform = new StreamPushItem();
  372. streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
  373. if (platFormInfoArray.length > 0) {
  374. // 数组 platFormInfoArray 0 为平台ID。 1为目录ID
  375. // 不存在这个平台,则忽略导入此关联关系
  376. if (platformInfoMap.get(platFormInfoArray[0]) == null
  377. || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
  378. logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
  379. continue;
  380. }
  381. streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
  382. if (platFormInfoArray[0].equals("34020000002110000001")) {
  383. System.out.println(111);
  384. }
  385. List<GbStream> gbStreamList = platformForEvent.get(platFormInfoArray[0]);
  386. if (gbStreamList == null) {
  387. gbStreamList = new ArrayList<>();
  388. platformForEvent.put(platFormInfoArray[0], gbStreamList);
  389. }
  390. // 为发送通知整理数据
  391. streamPushItemForPlatform.setName(streamPushItem.getName());
  392. streamPushItemForPlatform.setApp(streamPushItem.getApp());
  393. streamPushItemForPlatform.setStream(streamPushItem.getStream());
  394. streamPushItemForPlatform.setGbId(streamPushItem.getGbId());
  395. gbStreamList.add(streamPushItemForPlatform);
  396. }
  397. if (platFormInfoArray.length > 1) {
  398. streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
  399. }
  400. streamPushItemListFroPlatform.add(streamPushItemForPlatform);
  401. }
  402. }
  403. }
  404. if (streamPushItemListFroPlatform.size() > 0) {
  405. platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform);
  406. // 发送通知
  407. for (String platformId : platformForEvent.keySet()) {
  408. eventPublisher.catalogEventPublishForStream(
  409. platformId, platformForEvent.get(platformId), CatalogEvent.ADD);
  410. }
  411. }
  412. }
  413. }
  414. @Override
  415. public boolean batchStop(List<GbStream> gbStreams) {
  416. if (gbStreams == null || gbStreams.size() == 0) {
  417. return false;
  418. }
  419. gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL);
  420. platformGbStreamMapper.delByGbStreams(gbStreams);
  421. gbStreamMapper.batchDelForGbStream(gbStreams);
  422. int delStream = streamPushMapper.delAllForGbStream(gbStreams);
  423. if (delStream > 0) {
  424. for (GbStream gbStream : gbStreams) {
  425. MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
  426. zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
  427. }
  428. }
  429. return true;
  430. }
  431. }