PlayServiceImpl.java 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.genersoft.iot.vmp.common.StreamInfo;
  6. import com.genersoft.iot.vmp.conf.UserSetting;
  7. import com.genersoft.iot.vmp.gb28181.bean.*;
  8. import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
  9. import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  10. import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
  11. import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
  12. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
  13. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
  14. import com.genersoft.iot.vmp.gb28181.utils.DateUtil;
  15. import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
  16. import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
  17. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  18. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  19. import com.genersoft.iot.vmp.service.IMediaServerService;
  20. import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
  21. import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
  22. import com.genersoft.iot.vmp.service.bean.PlayBackResult;
  23. import com.genersoft.iot.vmp.service.bean.SSRCInfo;
  24. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  25. import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  26. import com.genersoft.iot.vmp.utils.redis.RedisUtil;
  27. import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
  28. import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
  29. import com.genersoft.iot.vmp.service.IMediaService;
  30. import com.genersoft.iot.vmp.service.IPlayService;
  31. import gov.nist.javax.sip.stack.SIPDialog;
  32. import org.slf4j.Logger;
  33. import org.slf4j.LoggerFactory;
  34. import org.springframework.beans.factory.annotation.Autowired;
  35. import org.springframework.http.HttpStatus;
  36. import org.springframework.http.ResponseEntity;
  37. import org.springframework.stereotype.Service;
  38. import org.springframework.util.ResourceUtils;
  39. import org.springframework.web.context.request.async.DeferredResult;
  40. import javax.sip.ResponseEvent;
  41. import java.io.FileNotFoundException;
  42. import java.math.BigDecimal;
  43. import java.util.*;
  44. @SuppressWarnings(value = {"rawtypes", "unchecked"})
  45. @Service
  46. public class PlayServiceImpl implements IPlayService {
  47. private final static Logger logger = LoggerFactory.getLogger(PlayServiceImpl.class);
  48. @Autowired
  49. private IVideoManagerStorage storager;
  50. @Autowired
  51. private SIPCommander cmder;
  52. @Autowired
  53. private SIPCommanderFroPlatform sipCommanderFroPlatform;
  54. @Autowired
  55. private IRedisCatchStorage redisCatchStorage;
  56. @Autowired
  57. private RedisUtil redis;
  58. @Autowired
  59. private DeferredResultHolder resultHolder;
  60. @Autowired
  61. private ZLMRESTfulUtils zlmresTfulUtils;
  62. @Autowired
  63. private AssistRESTfulUtils assistRESTfulUtils;
  64. @Autowired
  65. private IMediaService mediaService;
  66. @Autowired
  67. private IMediaServerService mediaServerService;
  68. @Autowired
  69. private VideoStreamSessionManager streamSession;
  70. @Autowired
  71. private UserSetting userSetting;
  72. @Override
  73. public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
  74. ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
  75. Runnable timeoutCallback) {
  76. PlayResult playResult = new PlayResult();
  77. RequestMessage msg = new RequestMessage();
  78. String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
  79. msg.setKey(key);
  80. String uuid = UUID.randomUUID().toString();
  81. msg.setId(uuid);
  82. playResult.setUuid(uuid);
  83. DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(userSetting.getPlayTimeout());
  84. playResult.setResult(result);
  85. // 录像查询以channelId作为deviceId查询
  86. resultHolder.put(key, uuid, result);
  87. if (mediaServerItem == null) {
  88. WVPResult wvpResult = new WVPResult();
  89. wvpResult.setCode(-1);
  90. wvpResult.setMsg("未找到可用的zlm");
  91. msg.setData(wvpResult);
  92. resultHolder.invokeResult(msg);
  93. return playResult;
  94. }
  95. Device device = redisCatchStorage.getDevice(deviceId);
  96. StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
  97. playResult.setDevice(device);
  98. result.onCompletion(()->{
  99. // 点播结束时调用截图接口
  100. // TODO 应该在上流时调用更好,结束也可能是错误结束
  101. try {
  102. String classPath = ResourceUtils.getURL("classpath:").getPath();
  103. // 兼容打包为jar的class路径
  104. if(classPath.contains("jar")) {
  105. classPath = classPath.substring(0, classPath.lastIndexOf("."));
  106. classPath = classPath.substring(0, classPath.lastIndexOf("/") + 1);
  107. }
  108. if (classPath.startsWith("file:")) {
  109. classPath = classPath.substring(classPath.indexOf(":") + 1);
  110. }
  111. String path = classPath + "static/static/snap/";
  112. // 兼容Windows系统路径(去除前面的“/”)
  113. if(System.getProperty("os.name").contains("indows")) {
  114. path = path.substring(1);
  115. }
  116. String fileName = deviceId + "_" + channelId + ".jpg";
  117. ResponseEntity responseEntity = (ResponseEntity)result.getResult();
  118. if (responseEntity != null && responseEntity.getStatusCode() == HttpStatus.OK) {
  119. WVPResult wvpResult = (WVPResult)responseEntity.getBody();
  120. if (Objects.requireNonNull(wvpResult).getCode() == 0) {
  121. StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
  122. MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
  123. String streamUrl = streamInfoForSuccess.getFmp4();
  124. // 请求截图
  125. logger.info("[请求截图]: " + fileName);
  126. zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
  127. }
  128. }
  129. } catch (FileNotFoundException e) {
  130. e.printStackTrace();
  131. }
  132. });
  133. if (streamInfo != null) {
  134. String streamId = streamInfo.getStream();
  135. if (streamId == null) {
  136. WVPResult wvpResult = new WVPResult();
  137. wvpResult.setCode(-1);
  138. wvpResult.setMsg("点播失败, redis缓存streamId等于null");
  139. msg.setData(wvpResult);
  140. resultHolder.invokeAllResult(msg);
  141. return playResult;
  142. }
  143. String mediaServerId = streamInfo.getMediaServerId();
  144. MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
  145. JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
  146. if (rtpInfo != null && rtpInfo.getBoolean("exist")) {
  147. WVPResult wvpResult = new WVPResult();
  148. wvpResult.setCode(0);
  149. wvpResult.setMsg("success");
  150. wvpResult.setData(streamInfo);
  151. msg.setData(wvpResult);
  152. resultHolder.invokeAllResult(msg);
  153. if (hookEvent != null) {
  154. hookEvent.response(mediaServerItem, JSONObject.parseObject(JSON.toJSONString(streamInfo)));
  155. }
  156. }else {
  157. redisCatchStorage.stopPlay(streamInfo);
  158. storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
  159. streamInfo = null;
  160. }
  161. }
  162. if (streamInfo == null) {
  163. String streamId = null;
  164. if (mediaServerItem.isRtpEnable()) {
  165. streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  166. }
  167. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck());
  168. play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
  169. if (hookEvent != null) {
  170. hookEvent.response(mediaServerItem, response);
  171. }
  172. }, event -> {
  173. // sip error错误
  174. WVPResult wvpResult = new WVPResult();
  175. wvpResult.setCode(-1);
  176. wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
  177. msg.setData(wvpResult);
  178. resultHolder.invokeAllResult(msg);
  179. if (errorEvent != null) {
  180. errorEvent.response(event);
  181. }
  182. }, (code, msgStr)->{
  183. // invite点播超时
  184. WVPResult wvpResult = new WVPResult();
  185. wvpResult.setCode(-1);
  186. if (code == 0) {
  187. wvpResult.setMsg("点播超时,请稍候重试");
  188. }else if (code == 1) {
  189. wvpResult.setMsg("收流超时,请稍候重试");
  190. }
  191. msg.setData(wvpResult);
  192. // 回复之前所有的点播请求
  193. resultHolder.invokeAllResult(msg);
  194. }, uuid);
  195. }
  196. return playResult;
  197. }
  198. @Override
  199. public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
  200. ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
  201. InviteTimeOutCallback timeoutCallback, String uuid) {
  202. String streamId = null;
  203. if (mediaServerItem.isRtpEnable()) {
  204. streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  205. }
  206. if (ssrcInfo == null) {
  207. ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck());
  208. }
  209. // 超时处理
  210. Timer timer = new Timer();
  211. SSRCInfo finalSsrcInfo = ssrcInfo;
  212. timer.schedule(new TimerTask() {
  213. @Override
  214. public void run() {
  215. logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", device.getDeviceId(), channelId));
  216. SIPDialog dialog = streamSession.getDialogByStream(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  217. if (dialog != null) {
  218. timeoutCallback.run(1, "收流超时");
  219. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  220. cmder.streamByeCmd(device.getDeviceId(), channelId, finalSsrcInfo.getStream(), null);
  221. }else {
  222. timeoutCallback.run(0, "点播超时");
  223. mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
  224. mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  225. streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  226. }
  227. }
  228. }, userSetting.getPlayTimeout());
  229. final String ssrc = ssrcInfo.getSsrc();
  230. cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
  231. logger.info("收到订阅消息: " + response.toJSONString());
  232. timer.cancel();
  233. // hook响应
  234. onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid);
  235. hookEvent.response(mediaServerItemInuse, response);
  236. }, (event) -> {
  237. ResponseEvent responseEvent = (ResponseEvent)event.event;
  238. String contentString = new String(responseEvent.getResponse().getRawContent());
  239. // 获取ssrc
  240. int ssrcIndex = contentString.indexOf("y=");
  241. // 检查是否有y字段
  242. if (ssrcIndex >= 0) {
  243. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  244. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
  245. if (!ssrc.equals(ssrcInResponse) && device.isSsrcCheck()) { // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  246. // 查询 ssrcInResponse 是否可用
  247. if (mediaServerItem.isRtpEnable() && !mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
  248. // ssrc 不可用
  249. // 释放ssrc
  250. mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
  251. streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  252. event.msg = "下级自定义了ssrc,但是此ssrc不可用";
  253. event.statusCode = 400;
  254. errorEvent.response(event);
  255. return;
  256. }
  257. // 关闭rtp server
  258. mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  259. // 重新开启ssrc server
  260. mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false);
  261. }
  262. }
  263. }, (event) -> {
  264. timer.cancel();
  265. mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  266. // 释放ssrc
  267. mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
  268. streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  269. errorEvent.response(event);
  270. });
  271. }
  272. @Override
  273. public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
  274. RequestMessage msg = new RequestMessage();
  275. if (uuid != null) {
  276. msg.setId(uuid);
  277. }
  278. msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
  279. StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
  280. if (streamInfo != null) {
  281. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  282. if (deviceChannel != null) {
  283. deviceChannel.setStreamId(streamInfo.getStream());
  284. storager.startPlay(deviceId, channelId, streamInfo.getStream());
  285. }
  286. redisCatchStorage.startPlay(streamInfo);
  287. WVPResult wvpResult = new WVPResult();
  288. wvpResult.setCode(0);
  289. wvpResult.setMsg("success");
  290. wvpResult.setData(streamInfo);
  291. msg.setData(wvpResult);
  292. resultHolder.invokeAllResult(msg);
  293. } else {
  294. logger.warn("设备预览API调用失败!");
  295. msg.setData("设备预览API调用失败!");
  296. resultHolder.invokeAllResult(msg);
  297. }
  298. }
  299. @Override
  300. public MediaServerItem getNewMediaServerItem(Device device) {
  301. if (device == null) return null;
  302. String mediaServerId = device.getMediaServerId();
  303. MediaServerItem mediaServerItem;
  304. if (mediaServerId == null) {
  305. mediaServerItem = mediaServerService.getMediaServerForMinimumLoad();
  306. }else {
  307. mediaServerItem = mediaServerService.getOne(mediaServerId);
  308. }
  309. if (mediaServerItem == null) {
  310. logger.warn("点播时未找到可使用的ZLM...");
  311. }
  312. return mediaServerItem;
  313. }
  314. @Override
  315. public DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime,
  316. String endTime,InviteStreamCallback inviteStreamCallback,
  317. PlayBackCallback callback) {
  318. Device device = storager.queryVideoDevice(deviceId);
  319. if (device == null) return null;
  320. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  321. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true);
  322. return playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback);
  323. }
  324. @Override
  325. public DeferredResult<ResponseEntity<String>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
  326. String deviceId, String channelId, String startTime,
  327. String endTime, InviteStreamCallback infoCallBack,
  328. PlayBackCallback playBackCallback) {
  329. if (mediaServerItem == null || ssrcInfo == null) return null;
  330. String uuid = UUID.randomUUID().toString();
  331. String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId;
  332. DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(30000L);
  333. Device device = storager.queryVideoDevice(deviceId);
  334. if (device == null) {
  335. result.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
  336. return result;
  337. }
  338. resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId, uuid, result);
  339. RequestMessage msg = new RequestMessage();
  340. msg.setId(uuid);
  341. msg.setKey(key);
  342. PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>();
  343. Timer timer = new Timer();
  344. timer.schedule(new TimerTask() {
  345. @Override
  346. public void run() {
  347. logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  348. playBackResult.setCode(-1);
  349. playBackResult.setData(msg);
  350. playBackCallback.call(playBackResult);
  351. SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
  352. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  353. if (dialog != null) {
  354. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  355. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  356. }else {
  357. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  358. mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
  359. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  360. }
  361. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  362. // 回复之前所有的点播请求
  363. playBackCallback.call(playBackResult);
  364. }
  365. }, userSetting.getPlayTimeout());
  366. cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack,
  367. (InviteStreamInfo inviteStreamInfo) -> {
  368. logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
  369. timer.cancel();
  370. StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
  371. if (streamInfo == null) {
  372. logger.warn("设备回放API调用失败!");
  373. msg.setData("设备回放API调用失败!");
  374. playBackResult.setCode(-1);
  375. playBackResult.setData(msg);
  376. playBackCallback.call(playBackResult);
  377. return;
  378. }
  379. redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId());
  380. msg.setData(JSON.toJSONString(streamInfo));
  381. playBackResult.setCode(0);
  382. playBackResult.setData(msg);
  383. playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
  384. playBackResult.setResponse(inviteStreamInfo.getResponse());
  385. playBackCallback.call(playBackResult);
  386. }, event -> {
  387. timer.cancel();
  388. msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
  389. playBackResult.setCode(-1);
  390. playBackResult.setData(msg);
  391. playBackResult.setEvent(event);
  392. playBackCallback.call(playBackResult);
  393. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  394. });
  395. return result;
  396. }
  397. @Override
  398. public DeferredResult<ResponseEntity<String>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
  399. Device device = storager.queryVideoDevice(deviceId);
  400. if (device == null) return null;
  401. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  402. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true);
  403. return download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed,infoCallBack, hookCallBack);
  404. }
  405. @Override
  406. public DeferredResult<ResponseEntity<String>> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
  407. if (mediaServerItem == null || ssrcInfo == null) return null;
  408. String uuid = UUID.randomUUID().toString();
  409. String key = DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId;
  410. DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(30000L);
  411. Device device = storager.queryVideoDevice(deviceId);
  412. if (device == null) {
  413. result.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
  414. return result;
  415. }
  416. resultHolder.put(key, uuid, result);
  417. RequestMessage msg = new RequestMessage();
  418. msg.setId(uuid);
  419. msg.setKey(key);
  420. WVPResult<StreamInfo> wvpResult = new WVPResult<>();
  421. msg.setData(wvpResult);
  422. PlayBackResult<RequestMessage> downloadResult = new PlayBackResult<>();
  423. downloadResult.setData(msg);
  424. Timer timer = new Timer();
  425. timer.schedule(new TimerTask() {
  426. @Override
  427. public void run() {
  428. logger.warn(String.format("录像下载请求超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  429. wvpResult.setCode(-1);
  430. wvpResult.setMsg("录像下载请求超时");
  431. downloadResult.setCode(-1);
  432. hookCallBack.call(downloadResult);
  433. SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
  434. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  435. if (dialog != null) {
  436. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  437. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  438. }else {
  439. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  440. mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
  441. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  442. }
  443. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  444. // 回复之前所有的点播请求
  445. hookCallBack.call(downloadResult);
  446. }
  447. }, userSetting.getPlayTimeout());
  448. cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, infoCallBack,
  449. inviteStreamInfo -> {
  450. logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
  451. timer.cancel();
  452. StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
  453. streamInfo.setStartTime(startTime);
  454. streamInfo.setEndTime(endTime);
  455. if (streamInfo == null) {
  456. logger.warn("录像下载API调用失败!");
  457. wvpResult.setCode(-1);
  458. wvpResult.setMsg("录像下载API调用失败");
  459. downloadResult.setCode(-1);
  460. hookCallBack.call(downloadResult);
  461. return ;
  462. }
  463. redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
  464. wvpResult.setCode(0);
  465. wvpResult.setMsg("success");
  466. wvpResult.setData(streamInfo);
  467. downloadResult.setCode(0);
  468. downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
  469. downloadResult.setResponse(inviteStreamInfo.getResponse());
  470. hookCallBack.call(downloadResult);
  471. }, event -> {
  472. timer.cancel();
  473. downloadResult.setCode(-1);
  474. wvpResult.setCode(-1);
  475. wvpResult.setMsg(String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg));
  476. downloadResult.setEvent(event);
  477. hookCallBack.call(downloadResult);
  478. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  479. });
  480. return result;
  481. }
  482. @Override
  483. public StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream) {
  484. StreamInfo streamInfo = redisCatchStorage.queryDownload(deviceId, channelId, stream, null);
  485. if (streamInfo != null) {
  486. if (streamInfo.getProgress() == 1) {
  487. return streamInfo;
  488. }
  489. // 获取当前已下载时长
  490. String mediaServerId = streamInfo.getMediaServerId();
  491. MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  492. if (mediaServerItem == null) {
  493. logger.warn("查询录像信息时发现节点已离线");
  494. return null;
  495. }
  496. if (mediaServerItem.getRecordAssistPort() != 0) {
  497. JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, streamInfo.getApp(), streamInfo.getStream(), null);
  498. if (jsonObject != null && jsonObject.getInteger("code") == 0) {
  499. long duration = jsonObject.getLong("data");
  500. if (duration == 0) {
  501. streamInfo.setProgress(0);
  502. }else {
  503. String startTime = streamInfo.getStartTime();
  504. String endTime = streamInfo.getEndTime();
  505. long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
  506. long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
  507. BigDecimal currentCount = new BigDecimal(duration/1000);
  508. BigDecimal totalCount = new BigDecimal(end-start);
  509. BigDecimal divide = currentCount.divide(totalCount,2, BigDecimal.ROUND_HALF_UP);
  510. double process = divide.doubleValue();
  511. streamInfo.setProgress(process);
  512. }
  513. }
  514. }
  515. }
  516. return streamInfo;
  517. }
  518. @Override
  519. public void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String uuid) {
  520. RequestMessage msg = new RequestMessage();
  521. msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId);
  522. msg.setId(uuid);
  523. StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
  524. if (streamInfo != null) {
  525. redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
  526. msg.setData(JSON.toJSONString(streamInfo));
  527. resultHolder.invokeResult(msg);
  528. } else {
  529. logger.warn("设备预览API调用失败!");
  530. msg.setData("设备预览API调用失败!");
  531. resultHolder.invokeResult(msg);
  532. }
  533. }
  534. public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {
  535. String streamId = resonse.getString("stream");
  536. JSONArray tracks = resonse.getJSONArray("tracks");
  537. StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem,"rtp", streamId, tracks);
  538. streamInfo.setDeviceID(deviceId);
  539. streamInfo.setChannelId(channelId);
  540. return streamInfo;
  541. }
  542. @Override
  543. public void zlmServerOffline(String mediaServerId) {
  544. // 处理正在向上推流的上级平台
  545. List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
  546. if (sendRtpItems.size() > 0) {
  547. for (SendRtpItem sendRtpItem : sendRtpItems) {
  548. if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
  549. ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
  550. sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
  551. }
  552. }
  553. }
  554. // 处理正在观看的国标设备
  555. List<SsrcTransaction> allSsrc = streamSession.getAllSsrc();
  556. if (allSsrc.size() > 0) {
  557. for (SsrcTransaction ssrcTransaction : allSsrc) {
  558. if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
  559. cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(),
  560. ssrcTransaction.getStream(), null);
  561. }
  562. }
  563. }
  564. }
  565. }