PlayServiceImpl.java 71 KB


  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson2.JSONObject;
  3. import com.genersoft.iot.vmp.common.InviteInfo;
  4. import com.genersoft.iot.vmp.common.InviteSessionStatus;
  5. import com.genersoft.iot.vmp.common.InviteSessionType;
  6. import com.genersoft.iot.vmp.common.StreamInfo;
  7. import com.genersoft.iot.vmp.conf.DynamicTask;
  8. import com.genersoft.iot.vmp.conf.UserSetting;
  9. import com.genersoft.iot.vmp.conf.exception.ControllerException;
  10. import com.genersoft.iot.vmp.conf.exception.ServiceException;
  11. import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
  12. import com.genersoft.iot.vmp.gb28181.bean.*;
  13. import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
  14. import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
  15. import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  16. import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
  17. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
  18. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
  19. import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
  20. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  21. import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
  22. import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
  23. import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
  24. import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
  25. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  26. import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
  27. import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
  28. import com.genersoft.iot.vmp.service.*;
  29. import com.genersoft.iot.vmp.service.bean.ErrorCallback;
  30. import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
  31. import com.genersoft.iot.vmp.service.bean.SSRCInfo;
  32. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  33. import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  34. import com.genersoft.iot.vmp.utils.DateUtil;
  35. import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
  36. import org.slf4j.Logger;
  37. import org.slf4j.LoggerFactory;
  38. import org.springframework.beans.factory.annotation.Autowired;
  39. import org.springframework.data.redis.core.RedisTemplate;
  40. import org.springframework.stereotype.Service;
  41. import org.springframework.util.ObjectUtils;
  42. import javax.sdp.*;
  43. import javax.sip.InvalidArgumentException;
  44. import javax.sip.ResponseEvent;
  45. import javax.sip.SipException;
  46. import java.io.File;
  47. import java.math.BigDecimal;
  48. import java.math.RoundingMode;
  49. import java.text.ParseException;
  50. import java.util.List;
  51. import java.util.UUID;
  52. import java.util.Vector;
  53. @SuppressWarnings(value = {"rawtypes", "unchecked"})
  54. @Service
  55. public class PlayServiceImpl implements IPlayService {
  56. private final static Logger logger = LoggerFactory.getLogger(PlayServiceImpl.class);
  57. @Autowired
  58. private IVideoManagerStorage storager;
  59. @Autowired
  60. private SIPCommander cmder;
  61. @Autowired
  62. private SIPCommanderFroPlatform sipCommanderFroPlatform;
  63. @Autowired
  64. private IRedisCatchStorage redisCatchStorage;
  65. @Autowired
  66. private IInviteStreamService inviteStreamService;
  67. @Autowired
  68. private DeferredResultHolder resultHolder;
  69. @Autowired
  70. private ZLMRESTfulUtils zlmresTfulUtils;
  71. @Autowired
  72. private ZLMServerFactory zlmServerFactory;
  73. @Autowired
  74. private AssistRESTfulUtils assistRESTfulUtils;
  75. @Autowired
  76. private IMediaService mediaService;
  77. @Autowired
  78. private IMediaServerService mediaServerService;
  79. @Autowired
  80. private VideoStreamSessionManager streamSession;
  81. @Autowired
  82. private IDeviceService deviceService;
  83. @Autowired
  84. private UserSetting userSetting;
  85. @Autowired
  86. private DynamicTask dynamicTask;
  87. @Autowired
  88. private ZlmHttpHookSubscribe subscribe;
  89. @Autowired
  90. private SSRCFactory ssrcFactory;
  91. @Autowired
  92. private RedisTemplate<Object, Object> redisTemplate;
  93. @Override
  94. public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback) {
  95. if (mediaServerItem == null) {
  96. throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
  97. }
  98. Device device = redisCatchStorage.getDevice(deviceId);
  99. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  100. if (inviteInfo != null ) {
  101. if (inviteInfo.getStreamInfo() == null) {
  102. // 点播发起了但是尚未成功, 仅注册回调等待结果即可
  103. inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
  104. return inviteInfo.getSsrcInfo();
  105. }else {
  106. StreamInfo streamInfo = inviteInfo.getStreamInfo();
  107. String streamId = streamInfo.getStream();
  108. if (streamId == null) {
  109. callback.run(InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(), "点播失败, redis缓存streamId等于null", null);
  110. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  111. InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(),
  112. "点播失败, redis缓存streamId等于null",
  113. null);
  114. return inviteInfo.getSsrcInfo();
  115. }
  116. String mediaServerId = streamInfo.getMediaServerId();
  117. MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
  118. Boolean ready = zlmServerFactory.isStreamReady(mediaInfo, "rtp", streamId);
  119. if (ready != null && ready) {
  120. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  121. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  122. InviteErrorCode.SUCCESS.getCode(),
  123. InviteErrorCode.SUCCESS.getMsg(),
  124. streamInfo);
  125. return inviteInfo.getSsrcInfo();
  126. }else {
  127. // 点播发起了但是尚未成功, 仅注册回调等待结果即可
  128. inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
  129. storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
  130. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  131. }
  132. }
  133. }
  134. String streamId = null;
  135. if (mediaServerItem.isRtpEnable()) {
  136. streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  137. }
  138. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());
  139. if (ssrcInfo == null) {
  140. callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
  141. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  142. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(),
  143. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(),
  144. null);
  145. return null;
  146. }
  147. // TODO 记录点播的状态
  148. play(mediaServerItem, ssrcInfo, device, channelId, callback);
  149. return ssrcInfo;
  150. }
  151. @Override
  152. public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
  153. ErrorCallback<Object> callback) {
  154. if (mediaServerItem == null || ssrcInfo == null) {
  155. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  156. InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
  157. null);
  158. return;
  159. }
  160. logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
  161. device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(),
  162. device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  163. //端口获取失败的ssrcInfo 没有必要发送点播指令
  164. if (ssrcInfo.getPort() <= 0) {
  165. logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
  166. // 释放ssrc
  167. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  168. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  169. callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
  170. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  171. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
  172. return;
  173. }
  174. // 初始化redis中的invite消息状态
  175. InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
  176. mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
  177. InviteSessionStatus.ready);
  178. inviteInfo.setSubStream(device.isSwitchPrimarySubStream());
  179. inviteStreamService.updateInviteInfo(inviteInfo);
  180. // 超时处理
  181. String timeOutTaskKey = UUID.randomUUID().toString();
  182. dynamicTask.startDelay(timeOutTaskKey, () -> {
  183. // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
  184. InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  185. if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) {
  186. logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}",
  187. device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流",
  188. ssrcInfo.getPort(), ssrcInfo.getSsrc());
  189. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  190. // InviteInfo inviteInfoForTimeout = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.play, device.getDeviceId(), channelId);
  191. // if (inviteInfoForTimeout == null) {
  192. // return;
  193. // }
  194. // if (InviteSessionStatus.ok == inviteInfoForTimeout.getStatus() ) {
  195. // // TODO 发送bye
  196. // }else {
  197. // // TODO 发送cancel
  198. // }
  199. callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
  200. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  201. InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
  202. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  203. try {
  204. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
  205. } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
  206. logger.error("[点播超时], 发送BYE失败 {}", e.getMessage());
  207. } finally {
  208. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  209. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  210. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  211. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  212. // 取消订阅消息监听
  213. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  214. subscribe.removeSubscribe(hookSubscribe);
  215. }
  216. }
  217. }, userSetting.getPlayTimeout());
  218. try {
  219. cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInuse, hookParam ) -> {
  220. logger.info("收到订阅消息: " + hookParam);
  221. dynamicTask.stop(timeOutTaskKey);
  222. // hook响应
  223. StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channelId);
  224. if (streamInfo == null){
  225. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  226. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  227. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  228. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  229. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  230. return;
  231. }
  232. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  233. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  234. InviteErrorCode.SUCCESS.getCode(),
  235. InviteErrorCode.SUCCESS.getMsg(),
  236. streamInfo);
  237. logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channelId,
  238. device.isSwitchPrimarySubStream() ? "辅码流" : "主码流");
  239. snapOnPlay(mediaServerItemInuse, device.getDeviceId(), channelId, ssrcInfo.getStream());
  240. }, (event) -> {
  241. inviteInfo.setStatus(InviteSessionStatus.ok);
  242. ResponseEvent responseEvent = (ResponseEvent) event.event;
  243. String contentString = new String(responseEvent.getResponse().getRawContent());
  244. // 获取ssrc
  245. int ssrcIndex = contentString.indexOf("y=");
  246. // 检查是否有y字段
  247. if (ssrcIndex >= 0) {
  248. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  249. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12).trim();
  250. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  251. if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
  252. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  253. tcpActiveHandler(device, channelId, contentString, mediaServerItem, timeOutTaskKey, ssrcInfo, callback);
  254. }
  255. return;
  256. }
  257. logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
  258. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  259. logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
  260. // 释放ssrc
  261. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  262. // 单端口模式streamId也有变化,重新设置监听即可
  263. if (!mediaServerItem.isRtpEnable()) {
  264. // 添加订阅
  265. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  266. subscribe.removeSubscribe(hookSubscribe);
  267. String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
  268. hookSubscribe.getContent().put("stream", stream);
  269. inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
  270. subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
  271. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam);
  272. dynamicTask.stop(timeOutTaskKey);
  273. // hook响应
  274. StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, hookParam, device.getDeviceId(), channelId);
  275. if (streamInfo == null){
  276. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  277. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  278. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  279. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  280. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  281. return;
  282. }
  283. callback.run(InviteErrorCode.SUCCESS.getCode(),
  284. InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  285. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  286. InviteErrorCode.SUCCESS.getCode(),
  287. InviteErrorCode.SUCCESS.getMsg(),
  288. streamInfo);
  289. snapOnPlay(mediaServerItemInUse, device.getDeviceId(), channelId, stream);
  290. });
  291. return;
  292. }
  293. // 更新ssrc
  294. Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
  295. if (!result) {
  296. try {
  297. logger.warn("[点播] 更新ssrc失败,停止点播 {}/{}", device.getDeviceId(), channelId);
  298. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
  299. } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
  300. logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
  301. }
  302. dynamicTask.stop(timeOutTaskKey);
  303. // 释放ssrc
  304. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  305. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  306. callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  307. "下级自定义了ssrc,重新设置收流信息失败", null);
  308. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  309. InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  310. "下级自定义了ssrc,重新设置收流信息失败", null);
  311. }else {
  312. if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) {
  313. inviteStreamService.removeInviteInfo(inviteInfo);
  314. }
  315. ssrcInfo.setSsrc(ssrcInResponse);
  316. inviteInfo.setSsrcInfo(ssrcInfo);
  317. inviteInfo.setStream(ssrcInfo.getStream());
  318. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  319. tcpActiveHandler(device, channelId, contentString, mediaServerItem, timeOutTaskKey, ssrcInfo, callback);
  320. }
  321. }
  322. }else {
  323. logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
  324. }
  325. }
  326. inviteStreamService.updateInviteInfo(inviteInfo);
  327. }, (event) -> {
  328. dynamicTask.stop(timeOutTaskKey);
  329. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  330. // 释放ssrc
  331. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  332. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  333. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
  334. String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  335. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  336. InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  337. String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  338. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  339. });
  340. } catch (InvalidArgumentException | SipException | ParseException e) {
  341. logger.error("[命令发送失败] 点播消息: {}", e.getMessage());
  342. dynamicTask.stop(timeOutTaskKey);
  343. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  344. // 释放ssrc
  345. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  346. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  347. callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
  348. InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
  349. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  350. InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
  351. InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
  352. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  353. }
  354. }
  355. private void tcpActiveHandler(Device device, String channelId, String contentString,
  356. MediaServerItem mediaServerItem,
  357. String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback<Object> callback){
  358. if (!device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  359. return;
  360. }
  361. String substring = contentString.substring(0, contentString.indexOf("y="));
  362. try {
  363. SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
  364. int port = -1;
  365. Vector mediaDescriptions = sdp.getMediaDescriptions(true);
  366. for (Object description : mediaDescriptions) {
  367. MediaDescription mediaDescription = (MediaDescription) description;
  368. Media media = mediaDescription.getMedia();
  369. Vector mediaFormats = media.getMediaFormats(false);
  370. if (mediaFormats.contains("96")) {
  371. port = media.getMediaPort();
  372. break;
  373. }
  374. }
  375. logger.info("[点播-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  376. JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
  377. logger.info("[点播-TCP主动连接对方] 结果: {}", jsonObject);
  378. } catch (SdpException e) {
  379. logger.error("[点播-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
  380. dynamicTask.stop(timeOutTaskKey);
  381. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  382. // 释放ssrc
  383. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  384. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  385. callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  386. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  387. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  388. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  389. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  390. }
  391. }
  392. /**
  393. * 点播成功时调用截图.
  394. *
  395. * @param mediaServerItemInuse media
  396. * @param deviceId 设备 ID
  397. * @param channelId 通道 ID
  398. * @param stream ssrc
  399. */
  400. private void snapOnPlay(MediaServerItem mediaServerItemInuse, String deviceId, String channelId, String stream) {
  401. String streamUrl;
  402. if (mediaServerItemInuse.getRtspPort() != 0) {
  403. streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", stream);
  404. } else {
  405. streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServerItemInuse.getHttpPort(), "rtp", stream);
  406. }
  407. String path = "snap";
  408. String fileName = deviceId + "_" + channelId + ".jpg";
  409. // 请求截图
  410. logger.info("[请求截图]: " + fileName);
  411. zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
  412. }
  413. private StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId) {
  414. StreamInfo streamInfo = null;
  415. Device device = redisCatchStorage.getDevice(deviceId);
  416. OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
  417. streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
  418. if (streamInfo != null) {
  419. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  420. if (deviceChannel != null) {
  421. deviceChannel.setStreamId(streamInfo.getStream());
  422. storager.startPlay(deviceId, channelId, streamInfo.getStream());
  423. }
  424. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  425. if (inviteInfo != null) {
  426. inviteInfo.setStatus(InviteSessionStatus.ok);
  427. inviteInfo.setStreamInfo(streamInfo);
  428. inviteStreamService.updateInviteInfo(inviteInfo);
  429. }
  430. }
  431. return streamInfo;
  432. }
  433. private StreamInfo onPublishHandlerForPlayback(MediaServerItem mediaServerItem, HookParam param, String deviceId, String channelId, String startTime, String endTime) {
  434. OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) param;
  435. StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
  436. if (streamInfo != null) {
  437. streamInfo.setStartTime(startTime);
  438. streamInfo.setEndTime(endTime);
  439. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  440. if (deviceChannel != null) {
  441. deviceChannel.setStreamId(streamInfo.getStream());
  442. storager.startPlay(deviceId, channelId, streamInfo.getStream());
  443. }
  444. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, deviceId, channelId);
  445. if (inviteInfo != null) {
  446. inviteInfo.setStatus(InviteSessionStatus.ok);
  447. inviteInfo.setStreamInfo(streamInfo);
  448. inviteStreamService.updateInviteInfo(inviteInfo);
  449. }
  450. }
  451. return streamInfo;
  452. }
  453. @Override
  454. public MediaServerItem getNewMediaServerItem(Device device) {
  455. if (device == null) {
  456. return null;
  457. }
  458. MediaServerItem mediaServerItem;
  459. if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
  460. mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
  461. } else {
  462. mediaServerItem = mediaServerService.getOne(device.getMediaServerId());
  463. }
  464. if (mediaServerItem == null) {
  465. logger.warn("点播时未找到可使用的ZLM...");
  466. }
  467. return mediaServerItem;
  468. }
  469. @Override
  470. public MediaServerItem getNewMediaServerItemHasAssist(Device device) {
  471. if (device == null) {
  472. return null;
  473. }
  474. MediaServerItem mediaServerItem;
  475. if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
  476. mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(true);
  477. } else {
  478. mediaServerItem = mediaServerService.getOne(device.getMediaServerId());
  479. }
  480. if (mediaServerItem == null) {
  481. logger.warn("[获取可用的ZLM节点]未找到可使用的ZLM...");
  482. }
  483. return mediaServerItem;
  484. }
  485. @Override
  486. public void playBack(String deviceId, String channelId, String startTime,
  487. String endTime, ErrorCallback<Object> callback) {
  488. Device device = storager.queryVideoDevice(deviceId);
  489. if (device == null) {
  490. return;
  491. }
  492. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  493. String stream = null;
  494. if (newMediaServerItem.isRtpEnable()) {
  495. String startTimeStr = startTime.replace("-", "")
  496. .replace(":", "")
  497. .replace(" ", "");
  498. System.out.println(startTimeStr);
  499. String endTimeTimeStr = endTime.replace("-", "")
  500. .replace(":", "")
  501. .replace(" ", "");
  502. System.out.println(endTimeTimeStr);
  503. stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr;
  504. }
  505. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
  506. playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback);
  507. }
  508. @Override
  509. public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
  510. String deviceId, String channelId, String startTime,
  511. String endTime, ErrorCallback<Object> callback) {
  512. if (mediaServerItem == null || ssrcInfo == null) {
  513. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  514. InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
  515. null);
  516. return;
  517. }
  518. Device device = storager.queryVideoDevice(deviceId);
  519. if (device == null) {
  520. throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备: " + deviceId + "不存在");
  521. }
  522. logger.info("[录像回放] deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
  523. device.getDeviceId(), channelId, startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(),
  524. ssrcInfo.getSsrc(), device.isSsrcCheck());
  525. // 初始化redis中的invite消息状态
  526. InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
  527. mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK,
  528. InviteSessionStatus.ready);
  529. inviteStreamService.updateInviteInfo(inviteInfo);
  530. String playBackTimeOutTaskKey = UUID.randomUUID().toString();
  531. dynamicTask.startDelay(playBackTimeOutTaskKey, () -> {
  532. logger.warn("[录像回放] 超时,deviceId:{} ,channelId:{}", deviceId, channelId);
  533. inviteStreamService.removeInviteInfo(inviteInfo);
  534. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
  535. try {
  536. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
  537. } catch (InvalidArgumentException | ParseException | SipException e) {
  538. logger.error("[录像回放] 超时 发送BYE失败 {}", e.getMessage());
  539. } catch (SsrcTransactionNotFoundException e) {
  540. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  541. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  542. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  543. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  544. }
  545. }, userSetting.getPlayTimeout());
  546. SipSubscribe.Event errorEvent = event -> {
  547. logger.info("[录像回放] 失败,{} {}", event.statusCode, event.msg);
  548. dynamicTask.stop(playBackTimeOutTaskKey);
  549. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
  550. String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  551. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  552. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  553. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  554. inviteStreamService.removeInviteInfo(inviteInfo);
  555. };
  556. ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
  557. logger.info("收到回放订阅消息: " + hookParam);
  558. dynamicTask.stop(playBackTimeOutTaskKey);
  559. StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
  560. if (streamInfo == null) {
  561. logger.warn("设备回放API调用失败!");
  562. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  563. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  564. return;
  565. }
  566. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  567. logger.info("[录像回放] 成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
  568. };
  569. try {
  570. cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime,
  571. hookEvent, eventResult -> {
  572. inviteInfo.setStatus(InviteSessionStatus.ok);
  573. ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
  574. String contentString = new String(responseEvent.getResponse().getRawContent());
  575. // 获取ssrc
  576. int ssrcIndex = contentString.indexOf("y=");
  577. // 检查是否有y字段
  578. if (ssrcIndex >= 0) {
  579. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  580. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
  581. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  582. if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
  583. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  584. String substring = contentString.substring(0, contentString.indexOf("y="));
  585. try {
  586. SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
  587. int port = -1;
  588. Vector mediaDescriptions = sdp.getMediaDescriptions(true);
  589. for (Object description : mediaDescriptions) {
  590. MediaDescription mediaDescription = (MediaDescription) description;
  591. Media media = mediaDescription.getMedia();
  592. Vector mediaFormats = media.getMediaFormats(false);
  593. if (mediaFormats.contains("96")) {
  594. port = media.getMediaPort();
  595. break;
  596. }
  597. }
  598. logger.info("[录像回放-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  599. JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
  600. logger.info("[录像回放-TCP主动连接对方] 结果: {}", jsonObject);
  601. } catch (SdpException e) {
  602. logger.error("[录像回放-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
  603. dynamicTask.stop(playBackTimeOutTaskKey);
  604. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  605. // 释放ssrc
  606. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  607. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  608. callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  609. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  610. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  611. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  612. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  613. }
  614. }
  615. return;
  616. }
  617. logger.info("[录像回放] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
  618. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  619. logger.info("[录像回放] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
  620. // 释放ssrc
  621. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  622. // 单端口模式streamId也有变化,需要重新设置监听
  623. if (!mediaServerItem.isRtpEnable()) {
  624. // 添加订阅
  625. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  626. subscribe.removeSubscribe(hookSubscribe);
  627. String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
  628. hookSubscribe.getContent().put("stream", stream);
  629. inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
  630. subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
  631. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam);
  632. dynamicTask.stop(playBackTimeOutTaskKey);
  633. // hook响应
  634. hookEvent.response(mediaServerItemInUse, hookParam);
  635. });
  636. }
  637. // 更新ssrc
  638. Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
  639. if (!result) {
  640. try {
  641. logger.warn("[录像回放] 更新ssrc失败,停止录像回放 {}/{}", device.getDeviceId(), channelId);
  642. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
  643. } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
  644. logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
  645. }
  646. dynamicTask.stop(playBackTimeOutTaskKey);
  647. // 释放ssrc
  648. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  649. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  650. callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  651. "下级自定义了ssrc,重新设置收流信息失败", null);
  652. }else {
  653. if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) {
  654. inviteStreamService.removeInviteInfo(inviteInfo);
  655. }
  656. ssrcInfo.setSsrc(ssrcInResponse);
  657. inviteInfo.setSsrcInfo(ssrcInfo);
  658. inviteInfo.setStream(ssrcInfo.getStream());
  659. }
  660. }else {
  661. logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
  662. }
  663. }
  664. inviteStreamService.updateInviteInfo(inviteInfo);
  665. }, errorEvent);
  666. } catch (InvalidArgumentException | SipException | ParseException e) {
  667. logger.error("[命令发送失败] 回放: {}", e.getMessage());
  668. SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
  669. eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
  670. eventResult.statusCode = -1;
  671. eventResult.msg = "命令发送失败";
  672. errorEvent.response(eventResult);
  673. }
  674. }
  675. @Override
  676. public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
  677. Device device = storager.queryVideoDevice(deviceId);
  678. if (device == null) {
  679. return;
  680. }
  681. MediaServerItem newMediaServerItem = getNewMediaServerItemHasAssist(device);
  682. if (newMediaServerItem == null) {
  683. callback.run(InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getCode(),
  684. InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getMsg(),
  685. null);
  686. return;
  687. }
  688. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
  689. download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, callback);
  690. }
  691. @Override
  692. public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
  693. if (mediaServerItem == null || ssrcInfo == null) {
  694. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  695. InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
  696. null);
  697. return;
  698. }
  699. Device device = storager.queryVideoDevice(deviceId);
  700. if (device == null) {
  701. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  702. "设备:" + deviceId + "不存在",
  703. null);
  704. return;
  705. }
  706. logger.info("[录像下载] deviceId: {}, channelId: {}, 下载速度:{}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, downloadSpeed, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  707. // 初始化redis中的invite消息状态
  708. InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
  709. mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.DOWNLOAD,
  710. InviteSessionStatus.ready);
  711. inviteStreamService.updateInviteInfo(inviteInfo);
  712. String downLoadTimeOutTaskKey = UUID.randomUUID().toString();
  713. dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> {
  714. logger.warn(String.format("录像下载请求超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  715. inviteStreamService.removeInviteInfo(inviteInfo);
  716. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
  717. InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
  718. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  719. try {
  720. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
  721. } catch (InvalidArgumentException | ParseException | SipException e) {
  722. logger.error("[录像流]录像下载请求超时, 发送BYE失败 {}", e.getMessage());
  723. } catch (SsrcTransactionNotFoundException e) {
  724. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  725. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  726. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  727. }
  728. }, userSetting.getPlayTimeout());
  729. SipSubscribe.Event errorEvent = event -> {
  730. dynamicTask.stop(downLoadTimeOutTaskKey);
  731. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
  732. String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  733. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  734. inviteStreamService.removeInviteInfo(inviteInfo);
  735. };
  736. ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
  737. logger.info("[录像下载]收到订阅消息: " + hookParam);
  738. dynamicTask.stop(downLoadTimeOutTaskKey);
  739. StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
  740. if (streamInfo == null) {
  741. logger.warn("[录像下载] 获取流地址信息失败");
  742. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  743. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  744. return;
  745. }
  746. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  747. logger.info("[录像下载] 调用成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
  748. };
  749. try {
  750. cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed,
  751. hookEvent, errorEvent, eventResult ->{
  752. inviteInfo.setStatus(InviteSessionStatus.ok);
  753. ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
  754. String contentString = new String(responseEvent.getResponse().getRawContent());
  755. // 获取ssrc
  756. int ssrcIndex = contentString.indexOf("y=");
  757. // 检查是否有y字段
  758. if (ssrcIndex >= 0) {
  759. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  760. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
  761. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  762. if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
  763. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  764. String substring = contentString.substring(0, contentString.indexOf("y="));
  765. try {
  766. SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
  767. int port = -1;
  768. Vector mediaDescriptions = sdp.getMediaDescriptions(true);
  769. for (Object description : mediaDescriptions) {
  770. MediaDescription mediaDescription = (MediaDescription) description;
  771. Media media = mediaDescription.getMedia();
  772. Vector mediaFormats = media.getMediaFormats(false);
  773. if (mediaFormats.contains("96")) {
  774. port = media.getMediaPort();
  775. break;
  776. }
  777. }
  778. logger.info("[录像下载-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  779. JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
  780. logger.info("[录像下载-TCP主动连接对方] 结果: {}", jsonObject);
  781. } catch (SdpException e) {
  782. logger.error("[录像下载-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
  783. dynamicTask.stop(downLoadTimeOutTaskKey);
  784. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  785. // 释放ssrc
  786. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  787. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  788. callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  789. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  790. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  791. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  792. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  793. }
  794. }
  795. return;
  796. }
  797. logger.info("[录像下载] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
  798. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  799. logger.info("[录像下载] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
  800. // 释放ssrc
  801. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  802. // 单端口模式streamId也有变化,需要重新设置监听
  803. if (!mediaServerItem.isRtpEnable()) {
  804. // 添加订阅
  805. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  806. subscribe.removeSubscribe(hookSubscribe);
  807. String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
  808. hookSubscribe.getContent().put("stream", stream);
  809. inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
  810. subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
  811. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam);
  812. dynamicTask.stop(downLoadTimeOutTaskKey);
  813. hookEvent.response(mediaServerItemInUse, hookParam);
  814. });
  815. }
  816. // 更新ssrc
  817. Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
  818. if (!result) {
  819. try {
  820. logger.warn("[录像下载] 更新ssrc失败,停止录像回放 {}/{}", device.getDeviceId(), channelId);
  821. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
  822. } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
  823. logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
  824. }
  825. dynamicTask.stop(downLoadTimeOutTaskKey);
  826. // 释放ssrc
  827. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  828. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  829. callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  830. "下级自定义了ssrc,重新设置收流信息失败", null);
  831. }else {
  832. if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) {
  833. inviteStreamService.removeInviteInfo(inviteInfo);
  834. }
  835. ssrcInfo.setSsrc(ssrcInResponse);
  836. inviteInfo.setSsrcInfo(ssrcInfo);
  837. inviteInfo.setStream(ssrcInfo.getStream());
  838. }
  839. }else {
  840. logger.info("[录像下载] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
  841. }
  842. }
  843. inviteStreamService.updateInviteInfo(inviteInfo);
  844. });
  845. } catch (InvalidArgumentException | SipException | ParseException e) {
  846. logger.error("[命令发送失败] 录像下载: {}", e.getMessage());
  847. SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
  848. eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
  849. eventResult.statusCode = -1;
  850. eventResult.msg = "命令发送失败";
  851. errorEvent.response(eventResult);
  852. }
  853. }
  854. @Override
  855. public StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream) {
  856. InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream);
  857. if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
  858. if (inviteInfo.getStreamInfo().getProgress() == 1) {
  859. return inviteInfo.getStreamInfo();
  860. }
  861. // 获取当前已下载时长
  862. String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId();
  863. MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  864. if (mediaServerItem == null) {
  865. logger.warn("查询录像信息时发现节点已离线");
  866. return null;
  867. }
  868. if (mediaServerItem.getRecordAssistPort() > 0) {
  869. JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream(), null);
  870. if (jsonObject == null) {
  871. throw new ControllerException(ErrorCode.ERROR100.getCode(), "连接Assist服务失败");
  872. }
  873. if (jsonObject.getInteger("code") == 0) {
  874. long duration = jsonObject.getLong("data");
  875. if (duration == 0) {
  876. inviteInfo.getStreamInfo().setProgress(0);
  877. } else {
  878. String startTime = inviteInfo.getStreamInfo().getStartTime();
  879. String endTime = inviteInfo.getStreamInfo().getEndTime();
  880. long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
  881. long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
  882. BigDecimal currentCount = new BigDecimal(duration / 1000);
  883. BigDecimal totalCount = new BigDecimal(end - start);
  884. BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
  885. double process = divide.doubleValue();
  886. inviteInfo.getStreamInfo().setProgress(process);
  887. }
  888. inviteStreamService.updateInviteInfo(inviteInfo);
  889. }
  890. }
  891. return inviteInfo.getStreamInfo();
  892. }
  893. return null;
  894. }
  895. private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, HookParam hookParam, String deviceId, String channelId, String startTime, String endTime) {
  896. OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam;
  897. StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, streamChangedHookParam, deviceId, channelId);
  898. if (streamInfo != null) {
  899. streamInfo.setProgress(0);
  900. streamInfo.setStartTime(startTime);
  901. streamInfo.setEndTime(endTime);
  902. InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, streamInfo.getStream());
  903. if (inviteInfo != null) {
  904. logger.info("[录像下载] 更新invite消息中的stream信息");
  905. inviteInfo.setStatus(InviteSessionStatus.ok);
  906. inviteInfo.setStreamInfo(streamInfo);
  907. inviteStreamService.updateInviteInfo(inviteInfo);
  908. }
  909. }
  910. return streamInfo;
  911. }
  912. public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) {
  913. StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), hookParam.getTracks(), null);
  914. streamInfo.setDeviceID(deviceId);
  915. streamInfo.setChannelId(channelId);
  916. return streamInfo;
  917. }
  918. @Override
  919. public void zlmServerOffline(String mediaServerId) {
  920. // 处理正在向上推流的上级平台
  921. List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
  922. if (sendRtpItems.size() > 0) {
  923. for (SendRtpItem sendRtpItem : sendRtpItems) {
  924. if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
  925. ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
  926. try {
  927. sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
  928. } catch (SipException | InvalidArgumentException | ParseException e) {
  929. logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
  930. }
  931. }
  932. }
  933. }
  934. // 处理正在观看的国标设备
  935. List<SsrcTransaction> allSsrc = streamSession.getAllSsrc();
  936. if (allSsrc.size() > 0) {
  937. for (SsrcTransaction ssrcTransaction : allSsrc) {
  938. if (ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
  939. Device device = deviceService.getDevice(ssrcTransaction.getDeviceId());
  940. if (device == null) {
  941. continue;
  942. }
  943. try {
  944. cmder.streamByeCmd(device, ssrcTransaction.getChannelId(),
  945. ssrcTransaction.getStream(), null);
  946. } catch (InvalidArgumentException | ParseException | SipException |
  947. SsrcTransactionNotFoundException e) {
  948. logger.error("[zlm离线]为正在使用此zlm的设备, 发送BYE失败 {}", e.getMessage());
  949. }
  950. }
  951. }
  952. }
  953. }
  954. @Override
  955. public void zlmServerOnline(String mediaServerId) {
  956. // TODO 查找之前的点播,流如果不存在则给下级发送bye
  957. // MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  958. // zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
  959. // Integer code = mediaList.getInteger("code");
  960. // if (code == 0) {
  961. // JSONArray data = mediaList.getJSONArray("data");
  962. // if (data == null || data.size() == 0) {
  963. // zlmServerOffline(mediaServerId);
  964. // }else {
  965. // Map<String, JSONObject> mediaListMap = new HashMap<>();
  966. // for (int i = 0; i < data.size(); i++) {
  967. // JSONObject json = data.getJSONObject(i);
  968. // String app = json.getString("app");
  969. // if ("rtp".equals(app)) {
  970. // String stream = json.getString("stream");
  971. // if (mediaListMap.get(stream) != null) {
  972. // continue;
  973. // }
  974. // mediaListMap.put(stream, json);
  975. // // 处理正在观看的国标设备
  976. // List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(null, null, null, stream);
  977. // if (ssrcTransactions.size() > 0) {
  978. // for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
  979. // if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
  980. // cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(),
  981. // ssrcTransaction.getStream(), null);
  982. // }
  983. // }
  984. // }
  985. // }
  986. // }
  987. // if (mediaListMap.size() > 0 ) {
  988. // // 处理正在向上推流的上级平台
  989. // List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
  990. // if (sendRtpItems.size() > 0) {
  991. // for (SendRtpItem sendRtpItem : sendRtpItems) {
  992. // if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
  993. // if (mediaListMap.get(sendRtpItem.getStreamId()) == null) {
  994. // ParentPlatform platform = storager.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
  995. // sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
  996. // }
  997. // }
  998. // }
  999. // }
  1000. // }
  1001. // }
  1002. // }
  1003. // }));
  1004. }
  1005. @Override
  1006. public void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {
  1007. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
  1008. if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
  1009. logger.warn("streamId不存在!");
  1010. throw new ServiceException("streamId不存在");
  1011. }
  1012. inviteInfo.getStreamInfo().setPause(true);
  1013. inviteStreamService.updateInviteInfo(inviteInfo);
  1014. MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
  1015. if (null == mediaServerItem) {
  1016. logger.warn("mediaServer 不存在!");
  1017. throw new ServiceException("mediaServer不存在");
  1018. }
  1019. // zlm 暂停RTP超时检查
  1020. JSONObject jsonObject = zlmresTfulUtils.pauseRtpCheck(mediaServerItem, streamId);
  1021. if (jsonObject == null || jsonObject.getInteger("code") != 0) {
  1022. throw new ServiceException("暂停RTP接收失败");
  1023. }
  1024. Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
  1025. cmder.playPauseCmd(device, inviteInfo.getStreamInfo());
  1026. }
  1027. @Override
  1028. public void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {
  1029. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
  1030. if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
  1031. logger.warn("streamId不存在!");
  1032. throw new ServiceException("streamId不存在");
  1033. }
  1034. inviteInfo.getStreamInfo().setPause(false);
  1035. inviteStreamService.updateInviteInfo(inviteInfo);
  1036. MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
  1037. if (null == mediaServerItem) {
  1038. logger.warn("mediaServer 不存在!");
  1039. throw new ServiceException("mediaServer不存在");
  1040. }
  1041. // zlm 暂停RTP超时检查
  1042. JSONObject jsonObject = zlmresTfulUtils.resumeRtpCheck(mediaServerItem, streamId);
  1043. if (jsonObject == null || jsonObject.getInteger("code") != 0) {
  1044. throw new ServiceException("继续RTP接收失败");
  1045. }
  1046. Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
  1047. cmder.playResumeCmd(device, inviteInfo.getStreamInfo());
  1048. }
  1049. @Override
  1050. public void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback) {
  1051. Device device = deviceService.getDevice(deviceId);
  1052. if (device == null) {
  1053. errorCallback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), null);
  1054. return;
  1055. }
  1056. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  1057. if (inviteInfo != null) {
  1058. if (inviteInfo.getStreamInfo() != null) {
  1059. // 已存在线直接截图
  1060. MediaServerItem mediaServerItemInuse = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
  1061. String streamUrl;
  1062. if (mediaServerItemInuse.getRtspPort() != 0) {
  1063. streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", inviteInfo.getStreamInfo().getStream());
  1064. }else {
  1065. streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServerItemInuse.getHttpPort(), "rtp", inviteInfo.getStreamInfo().getStream());
  1066. }
  1067. String path = "snap";
  1068. // 请求截图
  1069. logger.info("[请求截图]: " + fileName);
  1070. zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
  1071. File snapFile = new File(path + File.separator + fileName);
  1072. if (snapFile.exists()) {
  1073. errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), snapFile.getAbsoluteFile());
  1074. }else {
  1075. errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
  1076. }
  1077. return;
  1078. }
  1079. }
  1080. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  1081. play(newMediaServerItem, deviceId, channelId, null, (code, msg, data)->{
  1082. if (code == InviteErrorCode.SUCCESS.getCode()) {
  1083. InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  1084. if (inviteInfoForPlay != null && inviteInfoForPlay.getStreamInfo() != null) {
  1085. getSnap(deviceId, channelId, fileName, errorCallback);
  1086. }else {
  1087. errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
  1088. }
  1089. }else {
  1090. errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
  1091. }
  1092. });
  1093. }
  1094. }