StreamProxyServiceImpl.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.genersoft.iot.vmp.common.StreamInfo;
  4. import com.genersoft.iot.vmp.conf.UserSetup;
  5. import com.genersoft.iot.vmp.gb28181.bean.GbStream;
  6. import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
  7. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  8. import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
  9. import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
  10. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  11. import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
  12. import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
  13. import com.genersoft.iot.vmp.service.IGbStreamService;
  14. import com.genersoft.iot.vmp.service.IMediaServerService;
  15. import com.genersoft.iot.vmp.service.IMediaService;
  16. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  17. import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
  18. import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
  19. import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
  20. import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
  21. import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
  22. import com.genersoft.iot.vmp.service.IStreamProxyService;
  23. import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
  24. import com.github.pagehelper.PageInfo;
  25. import org.slf4j.Logger;
  26. import org.slf4j.LoggerFactory;
  27. import org.springframework.beans.factory.annotation.Autowired;
  28. import org.springframework.stereotype.Service;
  29. import org.springframework.util.StringUtils;
  30. import java.util.*;
  31. /**
  32. * 视频代理业务
  33. */
  34. @Service
  35. public class StreamProxyServiceImpl implements IStreamProxyService {
  36. private final static Logger logger = LoggerFactory.getLogger(StreamProxyServiceImpl.class);
  37. @Autowired
  38. private IVideoManagerStorager videoManagerStorager;
  39. @Autowired
  40. private IMediaService mediaService;
  41. @Autowired
  42. private ZLMRESTfulUtils zlmresTfulUtils;;
  43. @Autowired
  44. private StreamProxyMapper streamProxyMapper;
  45. @Autowired
  46. private IRedisCatchStorage redisCatchStorage;
  47. @Autowired
  48. private UserSetup userSetup;
  49. @Autowired
  50. private GbStreamMapper gbStreamMapper;
  51. @Autowired
  52. private PlatformGbStreamMapper platformGbStreamMapper;
  53. @Autowired
  54. private ParentPlatformMapper parentPlatformMapper;
  55. @Autowired
  56. private IGbStreamService gbStreamService;
  57. @Autowired
  58. private IMediaServerService mediaServerService;
  59. @Override
  60. public WVPResult<StreamInfo> save(StreamProxyItem param) {
  61. MediaServerItem mediaInfo;
  62. WVPResult<StreamInfo> wvpResult = new WVPResult<>();
  63. wvpResult.setCode(0);
  64. if ("auto".equals(param.getMediaServerId())){
  65. mediaInfo = mediaServerService.getMediaServerForMinimumLoad();
  66. }else {
  67. mediaInfo = mediaServerService.getOne(param.getMediaServerId());
  68. }
  69. if (mediaInfo == null) {
  70. logger.warn("保存代理未找到在线的ZLM...");
  71. wvpResult.setMsg("保存失败");
  72. return wvpResult;
  73. }
  74. String dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
  75. param.getStream() );
  76. param.setDst_url(dstUrl);
  77. StringBuffer result = new StringBuffer();
  78. boolean streamLive = false;
  79. param.setMediaServerId(mediaInfo.getId());
  80. boolean saveResult;
  81. // 更新
  82. if (videoManagerStorager.queryStreamProxy(param.getApp(), param.getStream()) != null) {
  83. saveResult = videoManagerStorager.updateStreamProxy(param);
  84. }else { // 新增
  85. saveResult = videoManagerStorager.addStreamProxy(param);
  86. }
  87. if (saveResult) {
  88. result.append("保存成功");
  89. if (param.isEnable()) {
  90. JSONObject jsonObject = addStreamProxyToZlm(param);
  91. if (jsonObject == null || jsonObject.getInteger("code") != 0) {
  92. streamLive = false;
  93. result.append(", 但是启用失败,请检查流地址是否可用");
  94. param.setEnable(false);
  95. // 直接移除
  96. if (param.isEnable_remove_none_reader()) {
  97. del(param.getApp(), param.getStream());
  98. }else {
  99. videoManagerStorager.updateStreamProxy(param);
  100. }
  101. }else {
  102. streamLive = true;
  103. StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
  104. mediaInfo, param.getApp(), param.getStream(), null);
  105. wvpResult.setData(streamInfo);
  106. }
  107. }
  108. }else {
  109. result.append("保存失败");
  110. }
  111. if ( !StringUtils.isEmpty(param.getPlatformGbId()) && streamLive) {
  112. List<GbStream> gbStreams = new ArrayList<>();
  113. gbStreams.add(param);
  114. if (gbStreamService.addPlatformInfo(gbStreams, param.getPlatformGbId(), param.getCatalogId())){
  115. result.append(", 关联国标平台[ " + param.getPlatformGbId() + " ]成功");
  116. }else {
  117. result.append(", 关联国标平台[ " + param.getPlatformGbId() + " ]失败");
  118. }
  119. }
  120. // 查找开启了全部直播流共享的上级平台
  121. List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
  122. if (parentPlatforms.size() > 0) {
  123. for (ParentPlatform parentPlatform : parentPlatforms) {
  124. param.setPlatformId(parentPlatform.getServerGBId());
  125. param.setCatalogId(parentPlatform.getCatalogId());
  126. String stream = param.getStream();
  127. StreamProxyItem streamProxyItems = platformGbStreamMapper.selectOne(param.getApp(), stream, parentPlatform.getServerGBId());
  128. if (streamProxyItems == null) {
  129. platformGbStreamMapper.add(param);
  130. }
  131. }
  132. }
  133. wvpResult.setMsg(result.toString());
  134. return wvpResult;
  135. }
  136. @Override
  137. public JSONObject addStreamProxyToZlm(StreamProxyItem param) {
  138. JSONObject result = null;
  139. MediaServerItem mediaServerItem = null;
  140. if (param.getMediaServerId() == null) {
  141. logger.warn("添加代理时MediaServerId 为null");
  142. return null;
  143. }else {
  144. mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
  145. }
  146. if (mediaServerItem == null) {
  147. return null;
  148. }
  149. if ("default".equals(param.getType())){
  150. result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl(),
  151. param.isEnable_hls(), param.isEnable_mp4(), param.getRtp_type());
  152. }else if ("ffmpeg".equals(param.getType())) {
  153. result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrc_url(), param.getDst_url(),
  154. param.getTimeout_ms() + "", param.isEnable_hls(), param.isEnable_mp4(),
  155. param.getFfmpeg_cmd_key());
  156. }
  157. return result;
  158. }
  159. @Override
  160. public JSONObject removeStreamProxyFromZlm(StreamProxyItem param) {
  161. if (param ==null) return null;
  162. MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
  163. JSONObject result = zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
  164. return result;
  165. }
  166. @Override
  167. public PageInfo<StreamProxyItem> getAll(Integer page, Integer count) {
  168. return videoManagerStorager.queryStreamProxyList(page, count);
  169. }
  170. @Override
  171. public void del(String app, String stream) {
  172. StreamProxyItem streamProxyItem = videoManagerStorager.queryStreamProxy(app, stream);
  173. if (streamProxyItem != null) {
  174. videoManagerStorager.deleteStreamProxy(app, stream);
  175. JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
  176. if (jsonObject != null && jsonObject.getInteger("code") == 0) {
  177. // 如果关联了国标那么移除关联
  178. gbStreamMapper.del(app, stream);
  179. platformGbStreamMapper.delByAppAndStream(app, stream);
  180. // TODO 如果关联的推流, 那么状态设置为离线
  181. }
  182. redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream);
  183. }
  184. }
  185. @Override
  186. public boolean start(String app, String stream) {
  187. boolean result = false;
  188. StreamProxyItem streamProxy = videoManagerStorager.queryStreamProxy(app, stream);
  189. if (!streamProxy.isEnable() && streamProxy != null) {
  190. JSONObject jsonObject = addStreamProxyToZlm(streamProxy);
  191. if (jsonObject == null) return false;
  192. if (jsonObject.getInteger("code") == 0) {
  193. result = true;
  194. streamProxy.setEnable(true);
  195. videoManagerStorager.updateStreamProxy(streamProxy);
  196. }
  197. }
  198. return result;
  199. }
  200. @Override
  201. public boolean stop(String app, String stream) {
  202. boolean result = false;
  203. StreamProxyItem streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream);
  204. if (streamProxyDto != null && streamProxyDto.isEnable()) {
  205. JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyDto);
  206. if (jsonObject.getInteger("code") == 0) {
  207. streamProxyDto.setEnable(false);
  208. result = videoManagerStorager.updateStreamProxy(streamProxyDto);
  209. }
  210. }
  211. return result;
  212. }
  213. @Override
  214. public JSONObject getFFmpegCMDs(MediaServerItem mediaServerItem) {
  215. JSONObject result = new JSONObject();
  216. JSONObject mediaServerConfigResuly = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
  217. if (mediaServerConfigResuly != null && mediaServerConfigResuly.getInteger("code") == 0
  218. && mediaServerConfigResuly.getJSONArray("data").size() > 0){
  219. JSONObject mediaServerConfig = mediaServerConfigResuly.getJSONArray("data").getJSONObject(0);
  220. for (String key : mediaServerConfig.keySet()) {
  221. if (key.startsWith("ffmpeg.cmd")){
  222. result.put(key, mediaServerConfig.getString(key));
  223. }
  224. }
  225. }
  226. return result;
  227. }
  228. @Override
  229. public StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId) {
  230. return videoManagerStorager.getStreamProxyByAppAndStream(app, streamId);
  231. }
  232. @Override
  233. public void zlmServerOnline(String mediaServerId) {
  234. zlmServerOffline(mediaServerId);
  235. }
  236. @Override
  237. public void zlmServerOffline(String mediaServerId) {
  238. // 移除开启了无人观看自动移除的流
  239. List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selecAutoRemoveItemByMediaServerId(mediaServerId);
  240. if (streamProxyItemList.size() > 0) {
  241. gbStreamMapper.batchDel(streamProxyItemList);
  242. }
  243. streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
  244. // 其他的流设置未启用
  245. streamProxyMapper.updateStatus(false, mediaServerId);
  246. String type = "PULL";
  247. // 发送redis消息
  248. List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
  249. if (streamInfoList.size() > 0) {
  250. for (StreamInfo streamInfo : streamInfoList) {
  251. JSONObject jsonObject = new JSONObject();
  252. jsonObject.put("serverId", userSetup.getServerId());
  253. jsonObject.put("app", streamInfo.getApp());
  254. jsonObject.put("stream", streamInfo.getStreamId());
  255. jsonObject.put("register", false);
  256. jsonObject.put("mediaServerId", mediaServerId);
  257. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  258. // 移除redis内流的信息
  259. redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId());
  260. }
  261. }
  262. }
  263. @Override
  264. public void clean() {
  265. }
  266. }