PlayServiceImpl.java 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.genersoft.iot.vmp.common.StreamInfo;
  6. import com.genersoft.iot.vmp.conf.DynamicTask;
  7. import com.genersoft.iot.vmp.conf.UserSetting;
  8. import com.genersoft.iot.vmp.gb28181.bean.*;
  9. import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
  10. import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  11. import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
  12. import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
  13. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
  14. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
  15. import com.genersoft.iot.vmp.utils.DateUtil;
  16. import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
  17. import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
  18. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  19. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  20. import com.genersoft.iot.vmp.service.IMediaServerService;
  21. import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
  22. import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
  23. import com.genersoft.iot.vmp.service.bean.PlayBackResult;
  24. import com.genersoft.iot.vmp.service.bean.SSRCInfo;
  25. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  26. import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  27. import com.genersoft.iot.vmp.utils.redis.RedisUtil;
  28. import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
  29. import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
  30. import com.genersoft.iot.vmp.service.IMediaService;
  31. import com.genersoft.iot.vmp.service.IPlayService;
  32. import gov.nist.javax.sip.stack.SIPDialog;
  33. import org.slf4j.Logger;
  34. import org.slf4j.LoggerFactory;
  35. import org.springframework.beans.factory.annotation.Autowired;
  36. import org.springframework.http.HttpStatus;
  37. import org.springframework.http.ResponseEntity;
  38. import org.springframework.stereotype.Service;
  39. import org.springframework.util.ResourceUtils;
  40. import org.springframework.web.context.request.async.DeferredResult;
  41. import javax.sip.ResponseEvent;
  42. import java.io.FileNotFoundException;
  43. import java.math.BigDecimal;
  44. import java.util.*;
  45. @SuppressWarnings(value = {"rawtypes", "unchecked"})
  46. @Service
  47. public class PlayServiceImpl implements IPlayService {
  48. private final static Logger logger = LoggerFactory.getLogger(PlayServiceImpl.class);
  49. @Autowired
  50. private IVideoManagerStorage storager;
  51. @Autowired
  52. private SIPCommander cmder;
  53. @Autowired
  54. private SIPCommanderFroPlatform sipCommanderFroPlatform;
  55. @Autowired
  56. private IRedisCatchStorage redisCatchStorage;
  57. @Autowired
  58. private RedisUtil redis;
  59. @Autowired
  60. private DeferredResultHolder resultHolder;
  61. @Autowired
  62. private ZLMRESTfulUtils zlmresTfulUtils;
  63. @Autowired
  64. private AssistRESTfulUtils assistRESTfulUtils;
  65. @Autowired
  66. private IMediaService mediaService;
  67. @Autowired
  68. private IMediaServerService mediaServerService;
  69. @Autowired
  70. private VideoStreamSessionManager streamSession;
  71. @Autowired
  72. private UserSetting userSetting;
  73. @Autowired
  74. private DynamicTask dynamicTask;
  75. @Autowired
  76. private ZLMHttpHookSubscribe subscribe;
  77. @Override
  78. public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
  79. ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
  80. Runnable timeoutCallback) {
  81. PlayResult playResult = new PlayResult();
  82. RequestMessage msg = new RequestMessage();
  83. String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
  84. msg.setKey(key);
  85. String uuid = UUID.randomUUID().toString();
  86. msg.setId(uuid);
  87. playResult.setUuid(uuid);
  88. DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
  89. playResult.setResult(result);
  90. // 录像查询以channelId作为deviceId查询
  91. resultHolder.put(key, uuid, result);
  92. if (mediaServerItem == null) {
  93. WVPResult wvpResult = new WVPResult();
  94. wvpResult.setCode(-1);
  95. wvpResult.setMsg("未找到可用的zlm");
  96. msg.setData(wvpResult);
  97. resultHolder.invokeResult(msg);
  98. return playResult;
  99. }
  100. Device device = redisCatchStorage.getDevice(deviceId);
  101. StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
  102. playResult.setDevice(device);
  103. result.onCompletion(()->{
  104. // 点播结束时调用截图接口
  105. // TODO 应该在上流时调用更好,结束也可能是错误结束
  106. try {
  107. String classPath = ResourceUtils.getURL("classpath:").getPath();
  108. // 兼容打包为jar的class路径
  109. if(classPath.contains("jar")) {
  110. classPath = classPath.substring(0, classPath.lastIndexOf("."));
  111. classPath = classPath.substring(0, classPath.lastIndexOf("/") + 1);
  112. }
  113. if (classPath.startsWith("file:")) {
  114. classPath = classPath.substring(classPath.indexOf(":") + 1);
  115. }
  116. String path = classPath + "static/static/snap/";
  117. // 兼容Windows系统路径(去除前面的“/”)
  118. if(System.getProperty("os.name").contains("indows")) {
  119. path = path.substring(1);
  120. }
  121. String fileName = deviceId + "_" + channelId + ".jpg";
  122. ResponseEntity responseEntity = (ResponseEntity)result.getResult();
  123. if (responseEntity != null && responseEntity.getStatusCode() == HttpStatus.OK) {
  124. WVPResult wvpResult = (WVPResult)responseEntity.getBody();
  125. if (Objects.requireNonNull(wvpResult).getCode() == 0) {
  126. StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
  127. MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
  128. String streamUrl = streamInfoForSuccess.getFmp4();
  129. // 请求截图
  130. logger.info("[请求截图]: " + fileName);
  131. zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
  132. }
  133. }
  134. } catch (FileNotFoundException e) {
  135. e.printStackTrace();
  136. }
  137. });
  138. if (streamInfo != null) {
  139. String streamId = streamInfo.getStream();
  140. if (streamId == null) {
  141. WVPResult wvpResult = new WVPResult();
  142. wvpResult.setCode(-1);
  143. wvpResult.setMsg("点播失败, redis缓存streamId等于null");
  144. msg.setData(wvpResult);
  145. resultHolder.invokeAllResult(msg);
  146. return playResult;
  147. }
  148. String mediaServerId = streamInfo.getMediaServerId();
  149. MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
  150. JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
  151. if (rtpInfo != null && rtpInfo.getBoolean("exist")) {
  152. WVPResult wvpResult = new WVPResult();
  153. wvpResult.setCode(0);
  154. wvpResult.setMsg("success");
  155. wvpResult.setData(streamInfo);
  156. msg.setData(wvpResult);
  157. resultHolder.invokeAllResult(msg);
  158. if (hookEvent != null) {
  159. hookEvent.response(mediaServerItem, JSONObject.parseObject(JSON.toJSONString(streamInfo)));
  160. }
  161. }else {
  162. redisCatchStorage.stopPlay(streamInfo);
  163. storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
  164. streamInfo = null;
  165. }
  166. }
  167. if (streamInfo == null) {
  168. String streamId = null;
  169. if (mediaServerItem.isRtpEnable()) {
  170. streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  171. }
  172. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
  173. play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
  174. if (hookEvent != null) {
  175. hookEvent.response(mediaServerItem, response);
  176. }
  177. }, event -> {
  178. // sip error错误
  179. WVPResult wvpResult = new WVPResult();
  180. wvpResult.setCode(-1);
  181. wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
  182. msg.setData(wvpResult);
  183. resultHolder.invokeAllResult(msg);
  184. if (errorEvent != null) {
  185. errorEvent.response(event);
  186. }
  187. }, (code, msgStr)->{
  188. // invite点播超时
  189. WVPResult wvpResult = new WVPResult();
  190. wvpResult.setCode(-1);
  191. if (code == 0) {
  192. wvpResult.setMsg("点播超时,请稍候重试");
  193. }else if (code == 1) {
  194. wvpResult.setMsg("收流超时,请稍候重试");
  195. }
  196. msg.setData(wvpResult);
  197. // 回复之前所有的点播请求
  198. resultHolder.invokeAllResult(msg);
  199. }, uuid);
  200. }
  201. return playResult;
  202. }
  203. @Override
  204. public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
  205. ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
  206. InviteTimeOutCallback timeoutCallback, String uuid) {
  207. String streamId = null;
  208. if (mediaServerItem.isRtpEnable()) {
  209. streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  210. }
  211. if (ssrcInfo == null) {
  212. ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
  213. }
  214. // 超时处理
  215. String timeOutTaskKey = UUID.randomUUID().toString();
  216. SSRCInfo finalSsrcInfo = ssrcInfo;
  217. dynamicTask.startDelay( timeOutTaskKey,()->{
  218. logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", device.getDeviceId(), channelId));
  219. SIPDialog dialog = streamSession.getDialogByStream(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  220. if (dialog != null) {
  221. timeoutCallback.run(1, "收流超时");
  222. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  223. cmder.streamByeCmd(device.getDeviceId(), channelId, finalSsrcInfo.getStream(), null);
  224. }else {
  225. timeoutCallback.run(0, "点播超时");
  226. mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
  227. mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  228. streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  229. }
  230. }, userSetting.getPlayTimeout()*1000);
  231. final String ssrc = ssrcInfo.getSsrc();
  232. final String stream = ssrcInfo.getStream();
  233. cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
  234. logger.info("收到订阅消息: " + response.toJSONString());
  235. dynamicTask.stop(timeOutTaskKey);
  236. // hook响应
  237. onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid);
  238. hookEvent.response(mediaServerItemInuse, response);
  239. }, (event) -> {
  240. ResponseEvent responseEvent = (ResponseEvent)event.event;
  241. String contentString = new String(responseEvent.getResponse().getRawContent());
  242. // 获取ssrc
  243. int ssrcIndex = contentString.indexOf("y=");
  244. // 检查是否有y字段
  245. if (ssrcIndex >= 0) {
  246. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  247. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
  248. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  249. if (ssrc.equals(ssrcInResponse)) {
  250. return;
  251. }
  252. logger.info("[SIP 消息] 收到invite 200, 发现下级自定义了ssrc 开启修正");
  253. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  254. if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
  255. // ssrc 不可用
  256. // 释放ssrc
  257. mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
  258. streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  259. event.msg = "下级自定义了ssrc,但是此ssrc不可用";
  260. event.statusCode = 400;
  261. errorEvent.response(event);
  262. return;
  263. }
  264. // 单端口模式streamId也有变化,需要重新设置监听
  265. if (!mediaServerItem.isRtpEnable()) {
  266. // 添加订阅
  267. JSONObject subscribeKey = new JSONObject();
  268. subscribeKey.put("app", "rtp");
  269. subscribeKey.put("stream", stream);
  270. subscribeKey.put("regist", true);
  271. subscribeKey.put("schema", "rtmp");
  272. subscribeKey.put("mediaServerId", mediaServerItem.getId());
  273. subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed,subscribeKey);
  274. subscribeKey.put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
  275. subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
  276. (MediaServerItem mediaServerItemInUse, JSONObject response)->{
  277. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
  278. dynamicTask.stop(timeOutTaskKey);
  279. // hook响应
  280. onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
  281. hookEvent.response(mediaServerItemInUse, response);
  282. });
  283. }
  284. // 关闭rtp server
  285. mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  286. // 重新开启ssrc server
  287. mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false);
  288. }
  289. }
  290. }, (event) -> {
  291. dynamicTask.stop(timeOutTaskKey);
  292. mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  293. // 释放ssrc
  294. mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
  295. streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  296. errorEvent.response(event);
  297. });
  298. }
  299. @Override
  300. public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
  301. RequestMessage msg = new RequestMessage();
  302. if (uuid != null) {
  303. msg.setId(uuid);
  304. }
  305. msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
  306. StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
  307. if (streamInfo != null) {
  308. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  309. if (deviceChannel != null) {
  310. deviceChannel.setStreamId(streamInfo.getStream());
  311. storager.startPlay(deviceId, channelId, streamInfo.getStream());
  312. }
  313. redisCatchStorage.startPlay(streamInfo);
  314. WVPResult wvpResult = new WVPResult();
  315. wvpResult.setCode(0);
  316. wvpResult.setMsg("success");
  317. wvpResult.setData(streamInfo);
  318. msg.setData(wvpResult);
  319. resultHolder.invokeAllResult(msg);
  320. } else {
  321. logger.warn("设备预览API调用失败!");
  322. msg.setData("设备预览API调用失败!");
  323. resultHolder.invokeAllResult(msg);
  324. }
  325. }
  326. @Override
  327. public MediaServerItem getNewMediaServerItem(Device device) {
  328. if (device == null) {
  329. return null;
  330. }
  331. String mediaServerId = device.getMediaServerId();
  332. MediaServerItem mediaServerItem;
  333. if (mediaServerId == null) {
  334. mediaServerItem = mediaServerService.getMediaServerForMinimumLoad();
  335. }else {
  336. mediaServerItem = mediaServerService.getOne(mediaServerId);
  337. }
  338. if (mediaServerItem == null) {
  339. logger.warn("点播时未找到可使用的ZLM...");
  340. }
  341. return mediaServerItem;
  342. }
  343. @Override
  344. public DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime,
  345. String endTime,InviteStreamCallback inviteStreamCallback,
  346. PlayBackCallback callback) {
  347. Device device = storager.queryVideoDevice(deviceId);
  348. if (device == null) {
  349. return null;
  350. }
  351. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  352. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true, true);
  353. return playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback);
  354. }
  355. @Override
  356. public DeferredResult<ResponseEntity<String>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
  357. String deviceId, String channelId, String startTime,
  358. String endTime, InviteStreamCallback infoCallBack,
  359. PlayBackCallback playBackCallback) {
  360. if (mediaServerItem == null || ssrcInfo == null) {
  361. return null;
  362. }
  363. String uuid = UUID.randomUUID().toString();
  364. String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId;
  365. DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(30000L);
  366. Device device = storager.queryVideoDevice(deviceId);
  367. if (device == null) {
  368. result.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
  369. return result;
  370. }
  371. resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId, uuid, result);
  372. RequestMessage msg = new RequestMessage();
  373. msg.setId(uuid);
  374. msg.setKey(key);
  375. PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>();
  376. String playBackTimeOutTaskKey = UUID.randomUUID().toString();
  377. dynamicTask.startDelay(playBackTimeOutTaskKey, ()->{
  378. logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  379. playBackResult.setCode(-1);
  380. playBackResult.setData(msg);
  381. playBackCallback.call(playBackResult);
  382. SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
  383. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  384. if (dialog != null) {
  385. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  386. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  387. }else {
  388. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  389. mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
  390. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  391. }
  392. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  393. // 回复之前所有的点播请求
  394. playBackCallback.call(playBackResult);
  395. }, userSetting.getPlayTimeout()*1000);
  396. cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack,
  397. (InviteStreamInfo inviteStreamInfo) -> {
  398. logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
  399. dynamicTask.stop(playBackTimeOutTaskKey);
  400. StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
  401. if (streamInfo == null) {
  402. logger.warn("设备回放API调用失败!");
  403. msg.setData("设备回放API调用失败!");
  404. playBackResult.setCode(-1);
  405. playBackResult.setData(msg);
  406. playBackCallback.call(playBackResult);
  407. return;
  408. }
  409. redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId());
  410. msg.setData(JSON.toJSONString(streamInfo));
  411. playBackResult.setCode(0);
  412. playBackResult.setData(msg);
  413. playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
  414. playBackResult.setResponse(inviteStreamInfo.getResponse());
  415. playBackCallback.call(playBackResult);
  416. }, event -> {
  417. dynamicTask.stop(playBackTimeOutTaskKey);
  418. msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
  419. playBackResult.setCode(-1);
  420. playBackResult.setData(msg);
  421. playBackResult.setEvent(event);
  422. playBackCallback.call(playBackResult);
  423. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  424. });
  425. return result;
  426. }
  427. @Override
  428. public DeferredResult<ResponseEntity<String>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
  429. Device device = storager.queryVideoDevice(deviceId);
  430. if (device == null) {
  431. return null;
  432. }
  433. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  434. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true, true);
  435. return download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed,infoCallBack, hookCallBack);
  436. }
  437. @Override
  438. public DeferredResult<ResponseEntity<String>> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
  439. if (mediaServerItem == null || ssrcInfo == null) {
  440. return null;
  441. }
  442. String uuid = UUID.randomUUID().toString();
  443. String key = DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId;
  444. DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(30000L);
  445. Device device = storager.queryVideoDevice(deviceId);
  446. if (device == null) {
  447. result.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
  448. return result;
  449. }
  450. resultHolder.put(key, uuid, result);
  451. RequestMessage msg = new RequestMessage();
  452. msg.setId(uuid);
  453. msg.setKey(key);
  454. WVPResult<StreamInfo> wvpResult = new WVPResult<>();
  455. msg.setData(wvpResult);
  456. PlayBackResult<RequestMessage> downloadResult = new PlayBackResult<>();
  457. downloadResult.setData(msg);
  458. String downLoadTimeOutTaskKey = UUID.randomUUID().toString();
  459. dynamicTask.startDelay(downLoadTimeOutTaskKey, ()->{
  460. logger.warn(String.format("录像下载请求超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  461. wvpResult.setCode(-1);
  462. wvpResult.setMsg("录像下载请求超时");
  463. downloadResult.setCode(-1);
  464. hookCallBack.call(downloadResult);
  465. SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
  466. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  467. if (dialog != null) {
  468. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  469. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  470. }else {
  471. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  472. mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
  473. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  474. }
  475. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  476. // 回复之前所有的点播请求
  477. hookCallBack.call(downloadResult);
  478. }, userSetting.getPlayTimeout()*1000);
  479. cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, infoCallBack,
  480. inviteStreamInfo -> {
  481. logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
  482. dynamicTask.stop(downLoadTimeOutTaskKey);
  483. StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
  484. streamInfo.setStartTime(startTime);
  485. streamInfo.setEndTime(endTime);
  486. if (streamInfo == null) {
  487. logger.warn("录像下载API调用失败!");
  488. wvpResult.setCode(-1);
  489. wvpResult.setMsg("录像下载API调用失败");
  490. downloadResult.setCode(-1);
  491. hookCallBack.call(downloadResult);
  492. return ;
  493. }
  494. redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
  495. wvpResult.setCode(0);
  496. wvpResult.setMsg("success");
  497. wvpResult.setData(streamInfo);
  498. downloadResult.setCode(0);
  499. downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
  500. downloadResult.setResponse(inviteStreamInfo.getResponse());
  501. hookCallBack.call(downloadResult);
  502. }, event -> {
  503. dynamicTask.stop(downLoadTimeOutTaskKey);
  504. downloadResult.setCode(-1);
  505. wvpResult.setCode(-1);
  506. wvpResult.setMsg(String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg));
  507. downloadResult.setEvent(event);
  508. hookCallBack.call(downloadResult);
  509. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  510. });
  511. return result;
  512. }
  513. @Override
  514. public StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream) {
  515. StreamInfo streamInfo = redisCatchStorage.queryDownload(deviceId, channelId, stream, null);
  516. if (streamInfo != null) {
  517. if (streamInfo.getProgress() == 1) {
  518. return streamInfo;
  519. }
  520. // 获取当前已下载时长
  521. String mediaServerId = streamInfo.getMediaServerId();
  522. MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  523. if (mediaServerItem == null) {
  524. logger.warn("查询录像信息时发现节点已离线");
  525. return null;
  526. }
  527. if (mediaServerItem.getRecordAssistPort() != 0) {
  528. JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, streamInfo.getApp(), streamInfo.getStream(), null);
  529. if (jsonObject != null && jsonObject.getInteger("code") == 0) {
  530. long duration = jsonObject.getLong("data");
  531. if (duration == 0) {
  532. streamInfo.setProgress(0);
  533. }else {
  534. String startTime = streamInfo.getStartTime();
  535. String endTime = streamInfo.getEndTime();
  536. long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
  537. long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
  538. BigDecimal currentCount = new BigDecimal(duration/1000);
  539. BigDecimal totalCount = new BigDecimal(end-start);
  540. BigDecimal divide = currentCount.divide(totalCount,2, BigDecimal.ROUND_HALF_UP);
  541. double process = divide.doubleValue();
  542. streamInfo.setProgress(process);
  543. }
  544. }
  545. }
  546. }
  547. return streamInfo;
  548. }
  549. @Override
  550. public void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String uuid) {
  551. RequestMessage msg = new RequestMessage();
  552. msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId);
  553. msg.setId(uuid);
  554. StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
  555. if (streamInfo != null) {
  556. redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
  557. msg.setData(JSON.toJSONString(streamInfo));
  558. resultHolder.invokeResult(msg);
  559. } else {
  560. logger.warn("设备预览API调用失败!");
  561. msg.setData("设备预览API调用失败!");
  562. resultHolder.invokeResult(msg);
  563. }
  564. }
  565. public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {
  566. String streamId = resonse.getString("stream");
  567. JSONArray tracks = resonse.getJSONArray("tracks");
  568. StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem,"rtp", streamId, tracks);
  569. streamInfo.setDeviceID(deviceId);
  570. streamInfo.setChannelId(channelId);
  571. return streamInfo;
  572. }
  573. @Override
  574. public void zlmServerOffline(String mediaServerId) {
  575. // 处理正在向上推流的上级平台
  576. List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
  577. if (sendRtpItems.size() > 0) {
  578. for (SendRtpItem sendRtpItem : sendRtpItems) {
  579. if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
  580. ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
  581. sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
  582. }
  583. }
  584. }
  585. // 处理正在观看的国标设备
  586. List<SsrcTransaction> allSsrc = streamSession.getAllSsrc();
  587. if (allSsrc.size() > 0) {
  588. for (SsrcTransaction ssrcTransaction : allSsrc) {
  589. if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
  590. cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(),
  591. ssrcTransaction.getStream(), null);
  592. }
  593. }
  594. }
  595. }
  596. }