PlayServiceImpl.java 21 KB


  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.genersoft.iot.vmp.common.StreamInfo;
  6. import com.genersoft.iot.vmp.conf.UserSetup;
  7. import com.genersoft.iot.vmp.gb28181.bean.*;
  8. import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
  9. import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  10. import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
  11. import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
  12. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
  13. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
  14. import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
  15. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  16. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  17. import com.genersoft.iot.vmp.service.IMediaServerService;
  18. import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
  19. import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
  20. import com.genersoft.iot.vmp.service.bean.PlayBackResult;
  21. import com.genersoft.iot.vmp.service.bean.SSRCInfo;
  22. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  23. import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
  24. import com.genersoft.iot.vmp.utils.redis.RedisUtil;
  25. import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
  26. import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
  27. import com.genersoft.iot.vmp.service.IMediaService;
  28. import com.genersoft.iot.vmp.service.IPlayService;
  29. import gov.nist.javax.sip.stack.SIPDialog;
  30. import jdk.nashorn.internal.ir.RuntimeNode;
  31. import org.slf4j.Logger;
  32. import org.slf4j.LoggerFactory;
  33. import org.springframework.beans.factory.annotation.Autowired;
  34. import org.springframework.http.HttpStatus;
  35. import org.springframework.http.ResponseEntity;
  36. import org.springframework.stereotype.Service;
  37. import org.springframework.util.ResourceUtils;
  38. import org.springframework.web.context.request.async.DeferredResult;
  39. import javax.sip.header.CallIdHeader;
  40. import javax.sip.header.Header;
  41. import javax.sip.message.Request;
  42. import java.io.FileNotFoundException;
  43. import java.util.*;
  44. @SuppressWarnings(value = {"rawtypes", "unchecked"})
  45. @Service
  46. public class PlayServiceImpl implements IPlayService {
  47. private final static Logger logger = LoggerFactory.getLogger(PlayServiceImpl.class);
  48. @Autowired
  49. private IVideoManagerStorager storager;
  50. @Autowired
  51. private SIPCommander cmder;
  52. @Autowired
  53. private SIPCommanderFroPlatform sipCommanderFroPlatform;
  54. @Autowired
  55. private IRedisCatchStorage redisCatchStorage;
  56. @Autowired
  57. private RedisUtil redis;
  58. @Autowired
  59. private DeferredResultHolder resultHolder;
  60. @Autowired
  61. private ZLMRESTfulUtils zlmresTfulUtils;
  62. @Autowired
  63. private IMediaService mediaService;
  64. @Autowired
  65. private IMediaServerService mediaServerService;
  66. @Autowired
  67. private VideoStreamSessionManager streamSession;
  68. @Autowired
  69. private UserSetup userSetup;
  70. @Override
  71. public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
  72. ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
  73. Runnable timeoutCallback) {
  74. PlayResult playResult = new PlayResult();
  75. RequestMessage msg = new RequestMessage();
  76. String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
  77. msg.setKey(key);
  78. String uuid = UUID.randomUUID().toString();
  79. msg.setId(uuid);
  80. playResult.setUuid(uuid);
  81. DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(userSetup.getPlayTimeout());
  82. playResult.setResult(result);
  83. // 录像查询以channelId作为deviceId查询
  84. resultHolder.put(key, uuid, result);
  85. if (mediaServerItem == null) {
  86. WVPResult wvpResult = new WVPResult();
  87. wvpResult.setCode(-1);
  88. wvpResult.setMsg("未找到可用的zlm");
  89. msg.setData(wvpResult);
  90. resultHolder.invokeResult(msg);
  91. return playResult;
  92. }
  93. Device device = redisCatchStorage.getDevice(deviceId);
  94. StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
  95. playResult.setDevice(device);
  96. result.onCompletion(()->{
  97. // 点播结束时调用截图接口
  98. // TODO 应该在上流时调用更好,结束也可能是错误结束
  99. try {
  100. String classPath = ResourceUtils.getURL("classpath:").getPath();
  101. // 兼容打包为jar的class路径
  102. if(classPath.contains("jar")) {
  103. classPath = classPath.substring(0, classPath.lastIndexOf("."));
  104. classPath = classPath.substring(0, classPath.lastIndexOf("/") + 1);
  105. }
  106. if (classPath.startsWith("file:")) {
  107. classPath = classPath.substring(classPath.indexOf(":") + 1);
  108. }
  109. String path = classPath + "static/static/snap/";
  110. // 兼容Windows系统路径(去除前面的“/”)
  111. if(System.getProperty("os.name").contains("indows")) {
  112. path = path.substring(1);
  113. }
  114. String fileName = deviceId + "_" + channelId + ".jpg";
  115. ResponseEntity responseEntity = (ResponseEntity)result.getResult();
  116. if (responseEntity != null && responseEntity.getStatusCode() == HttpStatus.OK) {
  117. WVPResult wvpResult = (WVPResult)responseEntity.getBody();
  118. if (Objects.requireNonNull(wvpResult).getCode() == 0) {
  119. StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
  120. MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
  121. String streamUrl = streamInfoForSuccess.getFmp4();
  122. // 请求截图
  123. logger.info("[请求截图]: " + fileName);
  124. zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
  125. }
  126. }
  127. } catch (FileNotFoundException e) {
  128. e.printStackTrace();
  129. }
  130. });
  131. if (streamInfo != null) {
  132. String streamId = streamInfo.getStream();
  133. if (streamId == null) {
  134. WVPResult wvpResult = new WVPResult();
  135. wvpResult.setCode(-1);
  136. wvpResult.setMsg("点播失败, redis缓存streamId等于null");
  137. msg.setData(wvpResult);
  138. resultHolder.invokeAllResult(msg);
  139. return playResult;
  140. }
  141. String mediaServerId = streamInfo.getMediaServerId();
  142. MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
  143. JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
  144. if (rtpInfo != null && rtpInfo.getBoolean("exist")) {
  145. WVPResult wvpResult = new WVPResult();
  146. wvpResult.setCode(0);
  147. wvpResult.setMsg("success");
  148. wvpResult.setData(streamInfo);
  149. msg.setData(wvpResult);
  150. resultHolder.invokeAllResult(msg);
  151. if (hookEvent != null) {
  152. hookEvent.response(mediaServerItem, JSONObject.parseObject(JSON.toJSONString(streamInfo)));
  153. }
  154. }else {
  155. redisCatchStorage.stopPlay(streamInfo);
  156. storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
  157. streamInfo = null;
  158. }
  159. }
  160. if (streamInfo == null) {
  161. String streamId = null;
  162. if (mediaServerItem.isRtpEnable()) {
  163. streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  164. }
  165. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId);
  166. play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
  167. if (hookEvent != null) {
  168. hookEvent.response(mediaServerItem, response);
  169. }
  170. }, event -> {
  171. // sip error错误
  172. WVPResult wvpResult = new WVPResult();
  173. wvpResult.setCode(-1);
  174. wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
  175. msg.setData(wvpResult);
  176. resultHolder.invokeAllResult(msg);
  177. if (errorEvent != null) {
  178. errorEvent.response(event);
  179. }
  180. }, (code, msgStr)->{
  181. // invite点播超时
  182. WVPResult wvpResult = new WVPResult();
  183. wvpResult.setCode(-1);
  184. if (code == 0) {
  185. wvpResult.setMsg("点播超时,请稍候重试");
  186. }else if (code == 1) {
  187. wvpResult.setMsg("收流超时,请稍候重试");
  188. }
  189. msg.setData(wvpResult);
  190. // 回复之前所有的点播请求
  191. resultHolder.invokeAllResult(msg);
  192. }, uuid);
  193. }
  194. return playResult;
  195. }
  196. @Override
  197. public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
  198. ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
  199. InviteTimeOutCallback timeoutCallback, String uuid) {
  200. String streamId = null;
  201. if (mediaServerItem.isRtpEnable()) {
  202. streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  203. }
  204. if (ssrcInfo == null) {
  205. ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId);
  206. }
  207. // 超时处理
  208. Timer timer = new Timer();
  209. SSRCInfo finalSsrcInfo = ssrcInfo;
  210. timer.schedule(new TimerTask() {
  211. @Override
  212. public void run() {
  213. logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", device.getDeviceId(), channelId));
  214. SIPDialog dialog = streamSession.getDialogByStream(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  215. if (dialog != null) {
  216. timeoutCallback.run(1, "收流超时");
  217. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  218. cmder.streamByeCmd(device.getDeviceId(), channelId, finalSsrcInfo.getStream(), null);
  219. }else {
  220. timeoutCallback.run(0, "点播超时");
  221. mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
  222. mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  223. streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  224. }
  225. }
  226. }, userSetup.getPlayTimeout());
  227. cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
  228. logger.info("收到订阅消息: " + response.toJSONString());
  229. timer.cancel();
  230. // hook响应
  231. onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid);
  232. hookEvent.response(mediaServerItemInuse, response);
  233. }, (event) -> {
  234. timer.cancel();
  235. mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  236. // 释放ssrc
  237. mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
  238. streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  239. errorEvent.response(event);
  240. });
  241. }
  242. @Override
  243. public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
  244. RequestMessage msg = new RequestMessage();
  245. if (uuid != null) {
  246. msg.setId(uuid);
  247. }
  248. msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
  249. StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
  250. if (streamInfo != null) {
  251. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  252. if (deviceChannel != null) {
  253. deviceChannel.setStreamId(streamInfo.getStream());
  254. storager.startPlay(deviceId, channelId, streamInfo.getStream());
  255. }
  256. redisCatchStorage.startPlay(streamInfo);
  257. WVPResult wvpResult = new WVPResult();
  258. wvpResult.setCode(0);
  259. wvpResult.setMsg("success");
  260. wvpResult.setData(streamInfo);
  261. msg.setData(wvpResult);
  262. resultHolder.invokeAllResult(msg);
  263. } else {
  264. logger.warn("设备预览API调用失败!");
  265. msg.setData("设备预览API调用失败!");
  266. resultHolder.invokeAllResult(msg);
  267. }
  268. }
  269. @Override
  270. public MediaServerItem getNewMediaServerItem(Device device) {
  271. if (device == null) return null;
  272. String mediaServerId = device.getMediaServerId();
  273. MediaServerItem mediaServerItem;
  274. if (mediaServerId == null) {
  275. mediaServerItem = mediaServerService.getMediaServerForMinimumLoad();
  276. }else {
  277. mediaServerItem = mediaServerService.getOne(mediaServerId);
  278. }
  279. if (mediaServerItem == null) {
  280. logger.warn("点播时未找到可使用的ZLM...");
  281. }
  282. return mediaServerItem;
  283. }
  284. @Override
  285. public DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime,
  286. String endTime,InviteStreamCallback inviteStreamCallback,
  287. PlayBackCallback callback) {
  288. Device device = storager.queryVideoDevice(deviceId);
  289. if (device == null) return null;
  290. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  291. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true);
  292. return playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback);
  293. }
  294. @Override
  295. public DeferredResult<ResponseEntity<String>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
  296. String deviceId, String channelId, String startTime,
  297. String endTime, InviteStreamCallback infoCallBack,
  298. PlayBackCallback playBackCallback) {
  299. if (mediaServerItem == null || ssrcInfo == null) return null;
  300. String uuid = UUID.randomUUID().toString();
  301. String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId;
  302. DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(30000L);
  303. Device device = storager.queryVideoDevice(deviceId);
  304. if (device == null) {
  305. result.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
  306. return result;
  307. }
  308. resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId, uuid, result);
  309. RequestMessage msg = new RequestMessage();
  310. msg.setId(uuid);
  311. msg.setKey(key);
  312. PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>();
  313. Timer timer = new Timer();
  314. timer.schedule(new TimerTask() {
  315. @Override
  316. public void run() {
  317. logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  318. playBackResult.setCode(-1);
  319. playBackResult.setData(msg);
  320. playBackCallback.call(playBackResult);
  321. SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
  322. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  323. if (dialog != null) {
  324. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  325. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  326. }else {
  327. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  328. mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
  329. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  330. }
  331. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  332. // 回复之前所有的点播请求
  333. playBackCallback.call(playBackResult);
  334. }
  335. }, userSetup.getPlayTimeout());
  336. cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack,
  337. (InviteStreamInfo inviteStreamInfo) -> {
  338. logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
  339. timer.cancel();
  340. StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
  341. if (streamInfo == null) {
  342. logger.warn("设备回放API调用失败!");
  343. msg.setData("设备回放API调用失败!");
  344. playBackResult.setCode(-1);
  345. playBackResult.setData(msg);
  346. playBackCallback.call(playBackResult);
  347. return;
  348. }
  349. redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId());
  350. msg.setData(JSON.toJSONString(streamInfo));
  351. playBackResult.setCode(0);
  352. playBackResult.setData(msg);
  353. playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
  354. playBackResult.setResponse(inviteStreamInfo.getResponse());
  355. playBackCallback.call(playBackResult);
  356. }, event -> {
  357. timer.cancel();
  358. msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
  359. playBackResult.setCode(-1);
  360. playBackResult.setData(msg);
  361. playBackResult.setEvent(event);
  362. playBackCallback.call(playBackResult);
  363. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  364. });
  365. return result;
  366. }
  367. @Override
  368. public void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String uuid) {
  369. RequestMessage msg = new RequestMessage();
  370. msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId);
  371. msg.setId(uuid);
  372. StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
  373. if (streamInfo != null) {
  374. redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
  375. msg.setData(JSON.toJSONString(streamInfo));
  376. resultHolder.invokeResult(msg);
  377. } else {
  378. logger.warn("设备预览API调用失败!");
  379. msg.setData("设备预览API调用失败!");
  380. resultHolder.invokeResult(msg);
  381. }
  382. }
  383. public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {
  384. String streamId = resonse.getString("stream");
  385. JSONArray tracks = resonse.getJSONArray("tracks");
  386. StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem,"rtp", streamId, tracks);
  387. streamInfo.setDeviceID(deviceId);
  388. streamInfo.setChannelId(channelId);
  389. return streamInfo;
  390. }
  391. @Override
  392. public void zlmServerOffline(String mediaServerId) {
  393. // 处理正在向上推流的上级平台
  394. List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
  395. if (sendRtpItems.size() > 0) {
  396. for (SendRtpItem sendRtpItem : sendRtpItems) {
  397. if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
  398. ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
  399. sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
  400. }
  401. }
  402. }
  403. // 处理正在观看的国标设备
  404. List<SsrcTransaction> allSsrc = streamSession.getAllSsrc();
  405. if (allSsrc.size() > 0) {
  406. for (SsrcTransaction ssrcTransaction : allSsrc) {
  407. if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
  408. cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(),
  409. ssrcTransaction.getStream(), null);
  410. }
  411. }
  412. }
  413. }
  414. }