PlayServiceImpl.java 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.genersoft.iot.vmp.common.StreamInfo;
  6. import com.genersoft.iot.vmp.conf.DynamicTask;
  7. import com.genersoft.iot.vmp.conf.UserSetting;
  8. import com.genersoft.iot.vmp.gb28181.bean.*;
  9. import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
  10. import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  11. import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
  12. import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
  13. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
  14. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
  15. import com.genersoft.iot.vmp.utils.DateUtil;
  16. import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
  17. import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
  18. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  19. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  20. import com.genersoft.iot.vmp.service.IMediaServerService;
  21. import com.genersoft.iot.vmp.service.IMediaService;
  22. import com.genersoft.iot.vmp.service.IPlayService;
  23. import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
  24. import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
  25. import com.genersoft.iot.vmp.service.bean.PlayBackResult;
  26. import com.genersoft.iot.vmp.service.bean.SSRCInfo;
  27. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  28. import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  29. import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
  30. import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
  31. import gov.nist.javax.sip.stack.SIPDialog;
  32. import org.slf4j.Logger;
  33. import org.slf4j.LoggerFactory;
  34. import org.springframework.beans.factory.annotation.Autowired;
  35. import org.springframework.http.HttpStatus;
  36. import org.springframework.http.ResponseEntity;
  37. import org.springframework.stereotype.Service;
  38. import org.springframework.util.ResourceUtils;
  39. import org.springframework.web.context.request.async.DeferredResult;
  40. import javax.sip.ResponseEvent;
  41. import java.io.FileNotFoundException;
  42. import java.math.BigDecimal;
  43. import java.math.RoundingMode;
  44. import java.util.*;
  45. @SuppressWarnings(value = {"rawtypes", "unchecked"})
  46. @Service
  47. public class PlayServiceImpl implements IPlayService {
  48. private final static Logger logger = LoggerFactory.getLogger(PlayServiceImpl.class);
  49. @Autowired
  50. private IVideoManagerStorage storager;
  51. @Autowired
  52. private SIPCommander cmder;
  53. @Autowired
  54. private SIPCommanderFroPlatform sipCommanderFroPlatform;
  55. @Autowired
  56. private IRedisCatchStorage redisCatchStorage;
  57. @Autowired
  58. private DeferredResultHolder resultHolder;
  59. @Autowired
  60. private ZLMRESTfulUtils zlmresTfulUtils;
  61. @Autowired
  62. private AssistRESTfulUtils assistRESTfulUtils;
  63. @Autowired
  64. private IMediaService mediaService;
  65. @Autowired
  66. private IMediaServerService mediaServerService;
  67. @Autowired
  68. private VideoStreamSessionManager streamSession;
  69. @Autowired
  70. private UserSetting userSetting;
  71. @Autowired
  72. private DynamicTask dynamicTask;
  73. @Autowired
  74. private ZLMHttpHookSubscribe subscribe;
  75. @Override
  76. public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
  77. ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
  78. Runnable timeoutCallback) {
  79. PlayResult playResult = new PlayResult();
  80. RequestMessage msg = new RequestMessage();
  81. String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
  82. msg.setKey(key);
  83. String uuid = UUID.randomUUID().toString();
  84. msg.setId(uuid);
  85. playResult.setUuid(uuid);
  86. DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
  87. playResult.setResult(result);
  88. // 录像查询以channelId作为deviceId查询
  89. resultHolder.put(key, uuid, result);
  90. if (mediaServerItem == null) {
  91. WVPResult wvpResult = new WVPResult();
  92. wvpResult.setCode(-1);
  93. wvpResult.setMsg("未找到可用的zlm");
  94. msg.setData(wvpResult);
  95. resultHolder.invokeResult(msg);
  96. return playResult;
  97. }
  98. Device device = redisCatchStorage.getDevice(deviceId);
  99. StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
  100. playResult.setDevice(device);
  101. result.onCompletion(()->{
  102. // 点播结束时调用截图接口
  103. // TODO 应该在上流时调用更好,结束也可能是错误结束
  104. try {
  105. String classPath = ResourceUtils.getURL("classpath:").getPath();
  106. // 兼容打包为jar的class路径
  107. if(classPath.contains("jar")) {
  108. classPath = classPath.substring(0, classPath.lastIndexOf("."));
  109. classPath = classPath.substring(0, classPath.lastIndexOf("/") + 1);
  110. }
  111. if (classPath.startsWith("file:")) {
  112. classPath = classPath.substring(classPath.indexOf(":") + 1);
  113. }
  114. String path = classPath + "static/static/snap/";
  115. // 兼容Windows系统路径(去除前面的“/”)
  116. if(System.getProperty("os.name").contains("indows")) {
  117. path = path.substring(1);
  118. }
  119. String fileName = deviceId + "_" + channelId + ".jpg";
  120. ResponseEntity responseEntity = (ResponseEntity)result.getResult();
  121. if (responseEntity != null && responseEntity.getStatusCode() == HttpStatus.OK) {
  122. WVPResult wvpResult = (WVPResult)responseEntity.getBody();
  123. if (Objects.requireNonNull(wvpResult).getCode() == 0) {
  124. StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
  125. MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
  126. String streamUrl = streamInfoForSuccess.getFmp4();
  127. // 请求截图
  128. logger.info("[请求截图]: " + fileName);
  129. zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
  130. }
  131. }
  132. } catch (FileNotFoundException e) {
  133. e.printStackTrace();
  134. }
  135. });
  136. if (streamInfo != null) {
  137. String streamId = streamInfo.getStream();
  138. if (streamId == null) {
  139. WVPResult wvpResult = new WVPResult();
  140. wvpResult.setCode(-1);
  141. wvpResult.setMsg("点播失败, redis缓存streamId等于null");
  142. msg.setData(wvpResult);
  143. resultHolder.invokeAllResult(msg);
  144. return playResult;
  145. }
  146. String mediaServerId = streamInfo.getMediaServerId();
  147. MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
  148. JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
  149. if (rtpInfo != null && rtpInfo.getBoolean("exist")) {
  150. WVPResult wvpResult = new WVPResult();
  151. wvpResult.setCode(0);
  152. wvpResult.setMsg("success");
  153. wvpResult.setData(streamInfo);
  154. msg.setData(wvpResult);
  155. resultHolder.invokeAllResult(msg);
  156. if (hookEvent != null) {
  157. hookEvent.response(mediaServerItem, JSONObject.parseObject(JSON.toJSONString(streamInfo)));
  158. }
  159. }else {
  160. redisCatchStorage.stopPlay(streamInfo);
  161. storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
  162. streamInfo = null;
  163. }
  164. }
  165. if (streamInfo == null) {
  166. String streamId = null;
  167. if (mediaServerItem.isRtpEnable()) {
  168. streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  169. }
  170. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
  171. play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
  172. if (hookEvent != null) {
  173. hookEvent.response(mediaServerItem, response);
  174. }
  175. }, event -> {
  176. // sip error错误
  177. WVPResult wvpResult = new WVPResult();
  178. wvpResult.setCode(-1);
  179. wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
  180. msg.setData(wvpResult);
  181. resultHolder.invokeAllResult(msg);
  182. if (errorEvent != null) {
  183. errorEvent.response(event);
  184. }
  185. }, (code, msgStr)->{
  186. // invite点播超时
  187. WVPResult wvpResult = new WVPResult();
  188. wvpResult.setCode(-1);
  189. if (code == 0) {
  190. wvpResult.setMsg("点播超时,请稍候重试");
  191. }else if (code == 1) {
  192. wvpResult.setMsg("收流超时,请稍候重试");
  193. }
  194. msg.setData(wvpResult);
  195. // 回复之前所有的点播请求
  196. resultHolder.invokeAllResult(msg);
  197. }, uuid);
  198. }
  199. return playResult;
  200. }
  201. @Override
  202. public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
  203. ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
  204. InviteTimeOutCallback timeoutCallback, String uuid) {
  205. String streamId = null;
  206. if (mediaServerItem.isRtpEnable()) {
  207. streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  208. }
  209. if (ssrcInfo == null) {
  210. ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
  211. }
  212. // 超时处理
  213. String timeOutTaskKey = UUID.randomUUID().toString();
  214. SSRCInfo finalSsrcInfo = ssrcInfo;
  215. dynamicTask.startDelay( timeOutTaskKey,()->{
  216. logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", device.getDeviceId(), channelId));
  217. SIPDialog dialog = streamSession.getDialogByStream(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  218. if (dialog != null) {
  219. timeoutCallback.run(1, "收流超时");
  220. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  221. cmder.streamByeCmd(device.getDeviceId(), channelId, finalSsrcInfo.getStream(), null);
  222. }else {
  223. timeoutCallback.run(0, "点播超时");
  224. mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
  225. mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  226. streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  227. }
  228. }, userSetting.getPlayTimeout()*1000);
  229. final String ssrc = ssrcInfo.getSsrc();
  230. final String stream = ssrcInfo.getStream();
  231. cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
  232. logger.info("收到订阅消息: " + response.toJSONString());
  233. dynamicTask.stop(timeOutTaskKey);
  234. // hook响应
  235. onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid);
  236. hookEvent.response(mediaServerItemInuse, response);
  237. }, (event) -> {
  238. ResponseEvent responseEvent = (ResponseEvent)event.event;
  239. String contentString = new String(responseEvent.getResponse().getRawContent());
  240. // 获取ssrc
  241. int ssrcIndex = contentString.indexOf("y=");
  242. // 检查是否有y字段
  243. if (ssrcIndex >= 0) {
  244. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  245. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
  246. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  247. if (ssrc.equals(ssrcInResponse)) {
  248. return;
  249. }
  250. logger.info("[SIP 消息] 收到invite 200, 发现下级自定义了ssrc 开启修正");
  251. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  252. if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
  253. // ssrc 不可用
  254. // 释放ssrc
  255. mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
  256. streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  257. event.msg = "下级自定义了ssrc,但是此ssrc不可用";
  258. event.statusCode = 400;
  259. errorEvent.response(event);
  260. return;
  261. }
  262. // 单端口模式streamId也有变化,需要重新设置监听
  263. if (!mediaServerItem.isRtpEnable()) {
  264. // 添加订阅
  265. JSONObject subscribeKey = new JSONObject();
  266. subscribeKey.put("app", "rtp");
  267. subscribeKey.put("stream", stream);
  268. subscribeKey.put("regist", true);
  269. subscribeKey.put("schema", "rtmp");
  270. subscribeKey.put("mediaServerId", mediaServerItem.getId());
  271. subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed,subscribeKey);
  272. subscribeKey.put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
  273. subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
  274. (MediaServerItem mediaServerItemInUse, JSONObject response)->{
  275. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
  276. dynamicTask.stop(timeOutTaskKey);
  277. // hook响应
  278. onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
  279. hookEvent.response(mediaServerItemInUse, response);
  280. });
  281. }
  282. // 关闭rtp server
  283. mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  284. // 重新开启ssrc server
  285. mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false);
  286. }
  287. }
  288. }, (event) -> {
  289. dynamicTask.stop(timeOutTaskKey);
  290. mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  291. // 释放ssrc
  292. mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
  293. streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  294. errorEvent.response(event);
  295. });
  296. }
  297. @Override
  298. public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
  299. RequestMessage msg = new RequestMessage();
  300. if (uuid != null) {
  301. msg.setId(uuid);
  302. }
  303. msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
  304. StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
  305. if (streamInfo != null) {
  306. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  307. if (deviceChannel != null) {
  308. deviceChannel.setStreamId(streamInfo.getStream());
  309. storager.startPlay(deviceId, channelId, streamInfo.getStream());
  310. }
  311. redisCatchStorage.startPlay(streamInfo);
  312. WVPResult wvpResult = new WVPResult();
  313. wvpResult.setCode(0);
  314. wvpResult.setMsg("success");
  315. wvpResult.setData(streamInfo);
  316. msg.setData(wvpResult);
  317. resultHolder.invokeAllResult(msg);
  318. } else {
  319. logger.warn("设备预览API调用失败!");
  320. msg.setData("设备预览API调用失败!");
  321. resultHolder.invokeAllResult(msg);
  322. }
  323. }
  324. @Override
  325. public MediaServerItem getNewMediaServerItem(Device device) {
  326. if (device == null) {
  327. return null;
  328. }
  329. String mediaServerId = device.getMediaServerId();
  330. MediaServerItem mediaServerItem;
  331. if (mediaServerId == null) {
  332. mediaServerItem = mediaServerService.getMediaServerForMinimumLoad();
  333. }else {
  334. mediaServerItem = mediaServerService.getOne(mediaServerId);
  335. }
  336. if (mediaServerItem == null) {
  337. logger.warn("点播时未找到可使用的ZLM...");
  338. }
  339. return mediaServerItem;
  340. }
  341. @Override
  342. public DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime,
  343. String endTime,InviteStreamCallback inviteStreamCallback,
  344. PlayBackCallback callback) {
  345. Device device = storager.queryVideoDevice(deviceId);
  346. if (device == null) {
  347. return null;
  348. }
  349. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  350. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true, true);
  351. return playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback);
  352. }
  353. @Override
  354. public DeferredResult<ResponseEntity<String>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
  355. String deviceId, String channelId, String startTime,
  356. String endTime, InviteStreamCallback infoCallBack,
  357. PlayBackCallback playBackCallback) {
  358. if (mediaServerItem == null || ssrcInfo == null) {
  359. return null;
  360. }
  361. String uuid = UUID.randomUUID().toString();
  362. String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId;
  363. DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(30000L);
  364. Device device = storager.queryVideoDevice(deviceId);
  365. if (device == null) {
  366. result.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
  367. return result;
  368. }
  369. resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId, uuid, result);
  370. RequestMessage msg = new RequestMessage();
  371. msg.setId(uuid);
  372. msg.setKey(key);
  373. PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>();
  374. String playBackTimeOutTaskKey = UUID.randomUUID().toString();
  375. dynamicTask.startDelay(playBackTimeOutTaskKey, ()->{
  376. logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  377. playBackResult.setCode(-1);
  378. playBackResult.setData(msg);
  379. playBackCallback.call(playBackResult);
  380. SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
  381. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  382. if (dialog != null) {
  383. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  384. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  385. }else {
  386. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  387. mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
  388. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  389. }
  390. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  391. // 回复之前所有的点播请求
  392. playBackCallback.call(playBackResult);
  393. }, userSetting.getPlayTimeout()*1000);
  394. cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack,
  395. (InviteStreamInfo inviteStreamInfo) -> {
  396. logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
  397. dynamicTask.stop(playBackTimeOutTaskKey);
  398. StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
  399. if (streamInfo == null) {
  400. logger.warn("设备回放API调用失败!");
  401. msg.setData("设备回放API调用失败!");
  402. playBackResult.setCode(-1);
  403. playBackResult.setData(msg);
  404. playBackCallback.call(playBackResult);
  405. return;
  406. }
  407. redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId());
  408. msg.setData(JSON.toJSONString(streamInfo));
  409. playBackResult.setCode(0);
  410. playBackResult.setData(msg);
  411. playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
  412. playBackResult.setResponse(inviteStreamInfo.getResponse());
  413. playBackCallback.call(playBackResult);
  414. }, event -> {
  415. dynamicTask.stop(playBackTimeOutTaskKey);
  416. msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
  417. playBackResult.setCode(-1);
  418. playBackResult.setData(msg);
  419. playBackResult.setEvent(event);
  420. playBackCallback.call(playBackResult);
  421. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  422. });
  423. return result;
  424. }
  425. @Override
  426. public DeferredResult<ResponseEntity<String>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
  427. Device device = storager.queryVideoDevice(deviceId);
  428. if (device == null) {
  429. return null;
  430. }
  431. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  432. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true, true);
  433. return download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed,infoCallBack, hookCallBack);
  434. }
  435. @Override
  436. public DeferredResult<ResponseEntity<String>> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
  437. if (mediaServerItem == null || ssrcInfo == null) {
  438. return null;
  439. }
  440. String uuid = UUID.randomUUID().toString();
  441. String key = DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId;
  442. DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(30000L);
  443. Device device = storager.queryVideoDevice(deviceId);
  444. if (device == null) {
  445. result.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
  446. return result;
  447. }
  448. resultHolder.put(key, uuid, result);
  449. RequestMessage msg = new RequestMessage();
  450. msg.setId(uuid);
  451. msg.setKey(key);
  452. WVPResult<StreamInfo> wvpResult = new WVPResult<>();
  453. msg.setData(wvpResult);
  454. PlayBackResult<RequestMessage> downloadResult = new PlayBackResult<>();
  455. downloadResult.setData(msg);
  456. String downLoadTimeOutTaskKey = UUID.randomUUID().toString();
  457. dynamicTask.startDelay(downLoadTimeOutTaskKey, ()->{
  458. logger.warn(String.format("录像下载请求超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  459. wvpResult.setCode(-1);
  460. wvpResult.setMsg("录像下载请求超时");
  461. downloadResult.setCode(-1);
  462. hookCallBack.call(downloadResult);
  463. SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
  464. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  465. if (dialog != null) {
  466. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  467. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  468. }else {
  469. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  470. mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
  471. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  472. }
  473. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  474. // 回复之前所有的点播请求
  475. hookCallBack.call(downloadResult);
  476. }, userSetting.getPlayTimeout()*1000);
  477. cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, infoCallBack,
  478. inviteStreamInfo -> {
  479. logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
  480. dynamicTask.stop(downLoadTimeOutTaskKey);
  481. StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
  482. streamInfo.setStartTime(startTime);
  483. streamInfo.setEndTime(endTime);
  484. if (streamInfo == null) {
  485. logger.warn("录像下载API调用失败!");
  486. wvpResult.setCode(-1);
  487. wvpResult.setMsg("录像下载API调用失败");
  488. downloadResult.setCode(-1);
  489. hookCallBack.call(downloadResult);
  490. return ;
  491. }
  492. redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
  493. wvpResult.setCode(0);
  494. wvpResult.setMsg("success");
  495. wvpResult.setData(streamInfo);
  496. downloadResult.setCode(0);
  497. downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
  498. downloadResult.setResponse(inviteStreamInfo.getResponse());
  499. hookCallBack.call(downloadResult);
  500. }, event -> {
  501. dynamicTask.stop(downLoadTimeOutTaskKey);
  502. downloadResult.setCode(-1);
  503. wvpResult.setCode(-1);
  504. wvpResult.setMsg(String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg));
  505. downloadResult.setEvent(event);
  506. hookCallBack.call(downloadResult);
  507. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  508. });
  509. return result;
  510. }
  511. @Override
  512. public StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream) {
  513. StreamInfo streamInfo = redisCatchStorage.queryDownload(deviceId, channelId, stream, null);
  514. if (streamInfo != null) {
  515. if (streamInfo.getProgress() == 1) {
  516. return streamInfo;
  517. }
  518. // 获取当前已下载时长
  519. String mediaServerId = streamInfo.getMediaServerId();
  520. MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  521. if (mediaServerItem == null) {
  522. logger.warn("查询录像信息时发现节点已离线");
  523. return null;
  524. }
  525. if (mediaServerItem.getRecordAssistPort() != 0) {
  526. JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, streamInfo.getApp(), streamInfo.getStream(), null);
  527. if (jsonObject != null && jsonObject.getInteger("code") == 0) {
  528. long duration = jsonObject.getLong("data");
  529. if (duration == 0) {
  530. streamInfo.setProgress(0);
  531. }else {
  532. String startTime = streamInfo.getStartTime();
  533. String endTime = streamInfo.getEndTime();
  534. long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
  535. long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
  536. BigDecimal currentCount = new BigDecimal(duration/1000);
  537. BigDecimal totalCount = new BigDecimal(end-start);
  538. BigDecimal divide = currentCount.divide(totalCount,2, RoundingMode.HALF_UP);
  539. double process = divide.doubleValue();
  540. streamInfo.setProgress(process);
  541. }
  542. }
  543. }
  544. }
  545. return streamInfo;
  546. }
  547. @Override
  548. public void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String uuid) {
  549. RequestMessage msg = new RequestMessage();
  550. msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId);
  551. msg.setId(uuid);
  552. StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
  553. if (streamInfo != null) {
  554. redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
  555. msg.setData(JSON.toJSONString(streamInfo));
  556. resultHolder.invokeResult(msg);
  557. } else {
  558. logger.warn("设备预览API调用失败!");
  559. msg.setData("设备预览API调用失败!");
  560. resultHolder.invokeResult(msg);
  561. }
  562. }
  563. public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {
  564. String streamId = resonse.getString("stream");
  565. JSONArray tracks = resonse.getJSONArray("tracks");
  566. StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem,"rtp", streamId, tracks);
  567. streamInfo.setDeviceID(deviceId);
  568. streamInfo.setChannelId(channelId);
  569. return streamInfo;
  570. }
  571. @Override
  572. public void zlmServerOffline(String mediaServerId) {
  573. // 处理正在向上推流的上级平台
  574. List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
  575. if (sendRtpItems.size() > 0) {
  576. for (SendRtpItem sendRtpItem : sendRtpItems) {
  577. if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
  578. ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
  579. sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
  580. }
  581. }
  582. }
  583. // 处理正在观看的国标设备
  584. List<SsrcTransaction> allSsrc = streamSession.getAllSsrc();
  585. if (allSsrc.size() > 0) {
  586. for (SsrcTransaction ssrcTransaction : allSsrc) {
  587. if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
  588. cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(),
  589. ssrcTransaction.getStream(), null);
  590. }
  591. }
  592. }
  593. }
  594. }