StreamProxyServiceImpl.java 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson2.JSONArray;
  3. import com.alibaba.fastjson2.JSONObject;
  4. import com.genersoft.iot.vmp.common.StreamInfo;
  5. import com.genersoft.iot.vmp.conf.UserSetting;
  6. import com.genersoft.iot.vmp.conf.exception.ControllerException;
  7. import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  8. import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
  9. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  10. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  11. import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
  12. import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
  13. import com.genersoft.iot.vmp.service.IGbStreamService;
  14. import com.genersoft.iot.vmp.service.IMediaServerService;
  15. import com.genersoft.iot.vmp.service.IMediaService;
  16. import com.genersoft.iot.vmp.service.IStreamProxyService;
  17. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  18. import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  19. import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
  20. import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
  21. import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
  22. import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
  23. import com.genersoft.iot.vmp.utils.DateUtil;
  24. import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
  25. import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo;
  26. import com.github.pagehelper.PageInfo;
  27. import org.slf4j.Logger;
  28. import org.slf4j.LoggerFactory;
  29. import org.springframework.beans.factory.annotation.Autowired;
  30. import org.springframework.jdbc.datasource.DataSourceTransactionManager;
  31. import org.springframework.stereotype.Service;
  32. import org.springframework.transaction.TransactionDefinition;
  33. import org.springframework.transaction.TransactionStatus;
  34. import org.springframework.util.ObjectUtils;
  35. import java.util.HashMap;
  36. import java.util.List;
  37. import java.util.Map;
  38. /**
  39. * 视频代理业务
  40. */
  41. @Service
  42. public class StreamProxyServiceImpl implements IStreamProxyService {
  43. private final static Logger logger = LoggerFactory.getLogger(StreamProxyServiceImpl.class);
  44. @Autowired
  45. private IVideoManagerStorage videoManagerStorager;
  46. @Autowired
  47. private IMediaService mediaService;
  48. @Autowired
  49. private ZLMRESTfulUtils zlmresTfulUtils;
  50. @Autowired
  51. private StreamProxyMapper streamProxyMapper;
  52. @Autowired
  53. private IRedisCatchStorage redisCatchStorage;
  54. @Autowired
  55. private IVideoManagerStorage storager;
  56. @Autowired
  57. private UserSetting userSetting;
  58. @Autowired
  59. private GbStreamMapper gbStreamMapper;
  60. @Autowired
  61. private PlatformGbStreamMapper platformGbStreamMapper;
  62. @Autowired
  63. private EventPublisher eventPublisher;
  64. @Autowired
  65. private ParentPlatformMapper parentPlatformMapper;
  66. @Autowired
  67. private IGbStreamService gbStreamService;
  68. @Autowired
  69. private IMediaServerService mediaServerService;
  70. @Autowired
  71. DataSourceTransactionManager dataSourceTransactionManager;
  72. @Autowired
  73. TransactionDefinition transactionDefinition;
  74. @Override
  75. public StreamInfo save(StreamProxyItem param) {
  76. MediaServerItem mediaInfo;
  77. if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){
  78. mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null);
  79. }else {
  80. mediaInfo = mediaServerService.getOne(param.getMediaServerId());
  81. }
  82. if (mediaInfo == null) {
  83. logger.warn("保存代理未找到在线的ZLM...");
  84. throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到在线的ZLM");
  85. }
  86. String dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
  87. param.getStream() );
  88. param.setDst_url(dstUrl);
  89. StringBuffer resultMsg = new StringBuffer();
  90. param.setMediaServerId(mediaInfo.getId());
  91. boolean saveResult;
  92. // 更新
  93. if (videoManagerStorager.queryStreamProxy(param.getApp(), param.getStream()) != null) {
  94. saveResult = updateStreamProxy(param);
  95. }else { // 新增
  96. saveResult = addStreamProxy(param);
  97. }
  98. if (!saveResult) {
  99. throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败");
  100. }
  101. StreamInfo resultForStreamInfo = null;
  102. resultMsg.append("保存成功");
  103. if (param.isEnable()) {
  104. JSONObject jsonObject = addStreamProxyToZlm(param);
  105. if (jsonObject == null || jsonObject.getInteger("code") != 0) {
  106. resultMsg.append(", 但是启用失败,请检查流地址是否可用");
  107. param.setEnable(false);
  108. // 直接移除
  109. if (param.isEnable_remove_none_reader()) {
  110. del(param.getApp(), param.getStream());
  111. }else {
  112. updateStreamProxy(param);
  113. }
  114. }else {
  115. resultForStreamInfo = mediaService.getStreamInfoByAppAndStream(
  116. mediaInfo, param.getApp(), param.getStream(), null, null);
  117. }
  118. }
  119. return resultForStreamInfo;
  120. }
  121. /**
  122. * 新增代理流
  123. * @param streamProxyItem
  124. * @return
  125. */
  126. private boolean addStreamProxy(StreamProxyItem streamProxyItem) {
  127. TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
  128. boolean result = false;
  129. streamProxyItem.setStreamType("proxy");
  130. streamProxyItem.setStatus(true);
  131. String now = DateUtil.getNow();
  132. streamProxyItem.setCreateTime(now);
  133. try {
  134. if (streamProxyMapper.add(streamProxyItem) > 0) {
  135. if (!ObjectUtils.isEmpty(streamProxyItem.getGbId())) {
  136. if (gbStreamMapper.add(streamProxyItem) < 0) {
  137. //事务回滚
  138. dataSourceTransactionManager.rollback(transactionStatus);
  139. return false;
  140. }
  141. }
  142. }else {
  143. //事务回滚
  144. dataSourceTransactionManager.rollback(transactionStatus);
  145. return false;
  146. }
  147. result = true;
  148. dataSourceTransactionManager.commit(transactionStatus); //手动提交
  149. }catch (Exception e) {
  150. logger.error("向数据库添加流代理失败:", e);
  151. dataSourceTransactionManager.rollback(transactionStatus);
  152. }
  153. return result;
  154. }
  155. /**
  156. * 更新代理流
  157. * @param streamProxyItem
  158. * @return
  159. */
  160. @Override
  161. public boolean updateStreamProxy(StreamProxyItem streamProxyItem) {
  162. TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
  163. boolean result = false;
  164. streamProxyItem.setStreamType("proxy");
  165. try {
  166. if (streamProxyMapper.update(streamProxyItem) > 0) {
  167. if (!ObjectUtils.isEmpty(streamProxyItem.getGbId())) {
  168. if (gbStreamMapper.updateByAppAndStream(streamProxyItem) == 0) {
  169. //事务回滚
  170. dataSourceTransactionManager.rollback(transactionStatus);
  171. return false;
  172. }
  173. }
  174. } else {
  175. //事务回滚
  176. dataSourceTransactionManager.rollback(transactionStatus);
  177. return false;
  178. }
  179. dataSourceTransactionManager.commit(transactionStatus); //手动提交
  180. result = true;
  181. }catch (Exception e) {
  182. e.printStackTrace();
  183. dataSourceTransactionManager.rollback(transactionStatus);
  184. }
  185. return result;
  186. }
  187. @Override
  188. public JSONObject addStreamProxyToZlm(StreamProxyItem param) {
  189. JSONObject result = null;
  190. MediaServerItem mediaServerItem = null;
  191. if (param.getMediaServerId() == null) {
  192. logger.warn("添加代理时MediaServerId 为null");
  193. return null;
  194. }else {
  195. mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
  196. }
  197. if (mediaServerItem == null) {
  198. return null;
  199. }
  200. if ("default".equals(param.getType())){
  201. result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl(),
  202. param.isEnable_audio(), param.isEnable_mp4(), param.getRtp_type());
  203. }else if ("ffmpeg".equals(param.getType())) {
  204. result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrc_url(), param.getDst_url(),
  205. param.getTimeout_ms() + "", param.isEnable_audio(), param.isEnable_mp4(),
  206. param.getFfmpeg_cmd_key());
  207. }
  208. return result;
  209. }
  210. @Override
  211. public JSONObject removeStreamProxyFromZlm(StreamProxyItem param) {
  212. if (param ==null) {
  213. return null;
  214. }
  215. MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
  216. JSONObject result = zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
  217. return result;
  218. }
  219. @Override
  220. public PageInfo<StreamProxyItem> getAll(Integer page, Integer count) {
  221. return videoManagerStorager.queryStreamProxyList(page, count);
  222. }
  223. @Override
  224. public void del(String app, String stream) {
  225. StreamProxyItem streamProxyItem = videoManagerStorager.queryStreamProxy(app, stream);
  226. if (streamProxyItem != null) {
  227. gbStreamService.sendCatalogMsg(streamProxyItem, CatalogEvent.DEL);
  228. videoManagerStorager.deleteStreamProxy(app, stream);
  229. JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
  230. if (jsonObject != null && jsonObject.getInteger("code") == 0) {
  231. // 如果关联了国标那么移除关联
  232. gbStreamMapper.del(app, stream);
  233. platformGbStreamMapper.delByAppAndStream(app, stream);
  234. // TODO 如果关联的推流, 那么状态设置为离线
  235. }
  236. redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream);
  237. }
  238. }
  239. @Override
  240. public boolean start(String app, String stream) {
  241. boolean result = false;
  242. StreamProxyItem streamProxy = videoManagerStorager.queryStreamProxy(app, stream);
  243. if (streamProxy != null && !streamProxy.isEnable() ) {
  244. JSONObject jsonObject = addStreamProxyToZlm(streamProxy);
  245. if (jsonObject == null) {
  246. return false;
  247. }
  248. if (jsonObject.getInteger("code") == 0) {
  249. result = true;
  250. streamProxy.setEnable(true);
  251. updateStreamProxy(streamProxy);
  252. }else {
  253. logger.info("启用代理失败: {}/{}->{}({})", app, stream, jsonObject.getString("msg"),
  254. streamProxy.getSrc_url() == null? streamProxy.getUrl():streamProxy.getSrc_url());
  255. }
  256. }
  257. return result;
  258. }
  259. @Override
  260. public boolean stop(String app, String stream) {
  261. boolean result = false;
  262. StreamProxyItem streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream);
  263. if (streamProxyDto != null && streamProxyDto.isEnable()) {
  264. JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyDto);
  265. if (jsonObject != null && jsonObject.getInteger("code") == 0) {
  266. streamProxyDto.setEnable(false);
  267. result = updateStreamProxy(streamProxyDto);
  268. }
  269. }
  270. return result;
  271. }
  272. @Override
  273. public JSONObject getFFmpegCMDs(MediaServerItem mediaServerItem) {
  274. JSONObject result = new JSONObject();
  275. JSONObject mediaServerConfigResuly = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
  276. if (mediaServerConfigResuly != null && mediaServerConfigResuly.getInteger("code") == 0
  277. && mediaServerConfigResuly.getJSONArray("data").size() > 0){
  278. JSONObject mediaServerConfig = mediaServerConfigResuly.getJSONArray("data").getJSONObject(0);
  279. for (String key : mediaServerConfig.keySet()) {
  280. if (key.startsWith("ffmpeg.cmd")){
  281. result.put(key, mediaServerConfig.getString(key));
  282. }
  283. }
  284. }
  285. return result;
  286. }
  287. @Override
  288. public StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId) {
  289. return videoManagerStorager.getStreamProxyByAppAndStream(app, streamId);
  290. }
  291. @Override
  292. public void zlmServerOnline(String mediaServerId) {
  293. // 移除开启了无人观看自动移除的流
  294. List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selecAutoRemoveItemByMediaServerId(mediaServerId);
  295. if (streamProxyItemList.size() > 0) {
  296. gbStreamMapper.batchDel(streamProxyItemList);
  297. }
  298. streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
  299. // 移除拉流代理生成的流信息
  300. // syncPullStream(mediaServerId);
  301. // 恢复流代理, 只查找这个这个流媒体
  302. List<StreamProxyItem> streamProxyListForEnable = storager.getStreamProxyListForEnableInMediaServer(
  303. mediaServerId, true);
  304. for (StreamProxyItem streamProxyDto : streamProxyListForEnable) {
  305. logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
  306. JSONObject jsonObject = addStreamProxyToZlm(streamProxyDto);
  307. if (jsonObject == null) {
  308. // 设置为离线
  309. logger.info("恢复流代理失败" + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
  310. updateStatus(false, streamProxyDto.getApp(), streamProxyDto.getStream());
  311. }else {
  312. updateStatus(true, streamProxyDto.getApp(), streamProxyDto.getStream());
  313. }
  314. }
  315. }
  316. @Override
  317. public void zlmServerOffline(String mediaServerId) {
  318. // 移除开启了无人观看自动移除的流
  319. List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selecAutoRemoveItemByMediaServerId(mediaServerId);
  320. if (streamProxyItemList.size() > 0) {
  321. gbStreamMapper.batchDel(streamProxyItemList);
  322. }
  323. streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
  324. // 其他的流设置离线
  325. streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
  326. String type = "PULL";
  327. // 发送redis消息
  328. List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, type);
  329. if (onStreamChangedHookParams.size() > 0) {
  330. for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) {
  331. JSONObject jsonObject = new JSONObject();
  332. jsonObject.put("serverId", userSetting.getServerId());
  333. jsonObject.put("app", onStreamChangedHookParam.getApp());
  334. jsonObject.put("stream", onStreamChangedHookParam.getStream());
  335. jsonObject.put("register", false);
  336. jsonObject.put("mediaServerId", mediaServerId);
  337. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  338. // 移除redis内流的信息
  339. redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
  340. }
  341. }
  342. }
  343. @Override
  344. public void clean() {
  345. }
  346. @Override
  347. public int updateStatus(boolean status, String app, String stream) {
  348. return streamProxyMapper.updateStatus(app, stream, status);
  349. }
  350. private void syncPullStream(String mediaServerId){
  351. MediaServerItem mediaServer = mediaServerService.getOne(mediaServerId);
  352. if (mediaServer != null) {
  353. List<OnStreamChangedHookParam> allPullStream = redisCatchStorage.getStreams(mediaServerId, "PULL");
  354. if (allPullStream.size() > 0) {
  355. zlmresTfulUtils.getMediaList(mediaServer, jsonObject->{
  356. Map<String, StreamInfo> stringStreamInfoMap = new HashMap<>();
  357. if (jsonObject.getInteger("code") == 0) {
  358. JSONArray data = jsonObject.getJSONArray("data");
  359. if(data != null && data.size() > 0) {
  360. for (int i = 0; i < data.size(); i++) {
  361. JSONObject streamJSONObj = data.getJSONObject(i);
  362. if ("rtsp".equals(streamJSONObj.getString("schema"))) {
  363. StreamInfo streamInfo = new StreamInfo();
  364. String app = streamJSONObj.getString("app");
  365. String stream = streamJSONObj.getString("stream");
  366. streamInfo.setApp(app);
  367. streamInfo.setStream(stream);
  368. stringStreamInfoMap.put(app+stream, streamInfo);
  369. }
  370. }
  371. }
  372. }
  373. if (stringStreamInfoMap.size() == 0) {
  374. redisCatchStorage.removeStream(mediaServerId, "PULL");
  375. }else {
  376. for (String key : stringStreamInfoMap.keySet()) {
  377. StreamInfo streamInfo = stringStreamInfoMap.get(key);
  378. if (stringStreamInfoMap.get(streamInfo.getApp() + streamInfo.getStream()) == null) {
  379. redisCatchStorage.removeStream(mediaServerId, "PULL", streamInfo.getApp(),
  380. streamInfo.getStream());
  381. }
  382. }
  383. }
  384. });
  385. }
  386. }
  387. }
  388. @Override
  389. public ResourceBaceInfo getOverview() {
  390. return streamProxyMapper.getOverview();
  391. }
  392. }