PlayServiceImpl.java 70 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson2.JSONObject;
  3. import com.genersoft.iot.vmp.common.InviteInfo;
  4. import com.genersoft.iot.vmp.common.InviteSessionStatus;
  5. import com.genersoft.iot.vmp.common.InviteSessionType;
  6. import com.genersoft.iot.vmp.common.StreamInfo;
  7. import com.genersoft.iot.vmp.conf.DynamicTask;
  8. import com.genersoft.iot.vmp.conf.UserSetting;
  9. import com.genersoft.iot.vmp.conf.exception.ControllerException;
  10. import com.genersoft.iot.vmp.conf.exception.ServiceException;
  11. import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
  12. import com.genersoft.iot.vmp.gb28181.bean.*;
  13. import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
  14. import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
  15. import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  16. import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
  17. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
  18. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
  19. import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
  20. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  21. import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
  22. import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
  23. import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
  24. import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
  25. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  26. import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
  27. import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
  28. import com.genersoft.iot.vmp.service.*;
  29. import com.genersoft.iot.vmp.service.bean.ErrorCallback;
  30. import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
  31. import com.genersoft.iot.vmp.service.bean.SSRCInfo;
  32. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  33. import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  34. import com.genersoft.iot.vmp.utils.DateUtil;
  35. import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
  36. import org.slf4j.Logger;
  37. import org.slf4j.LoggerFactory;
  38. import org.springframework.beans.factory.annotation.Autowired;
  39. import org.springframework.data.redis.core.RedisTemplate;
  40. import org.springframework.stereotype.Service;
  41. import org.springframework.util.ObjectUtils;
  42. import javax.sdp.*;
  43. import javax.sip.InvalidArgumentException;
  44. import javax.sip.ResponseEvent;
  45. import javax.sip.SipException;
  46. import java.io.File;
  47. import java.math.BigDecimal;
  48. import java.math.RoundingMode;
  49. import java.text.ParseException;
  50. import java.util.List;
  51. import java.util.UUID;
  52. import java.util.Vector;
  53. @SuppressWarnings(value = {"rawtypes", "unchecked"})
  54. @Service
  55. public class PlayServiceImpl implements IPlayService {
  56. private final static Logger logger = LoggerFactory.getLogger(PlayServiceImpl.class);
  57. @Autowired
  58. private IVideoManagerStorage storager;
  59. @Autowired
  60. private SIPCommander cmder;
  61. @Autowired
  62. private SIPCommanderFroPlatform sipCommanderFroPlatform;
  63. @Autowired
  64. private IRedisCatchStorage redisCatchStorage;
  65. @Autowired
  66. private IInviteStreamService inviteStreamService;
  67. @Autowired
  68. private DeferredResultHolder resultHolder;
  69. @Autowired
  70. private ZLMRESTfulUtils zlmresTfulUtils;
  71. @Autowired
  72. private ZLMServerFactory zlmServerFactory;
  73. @Autowired
  74. private AssistRESTfulUtils assistRESTfulUtils;
  75. @Autowired
  76. private IMediaService mediaService;
  77. @Autowired
  78. private IMediaServerService mediaServerService;
  79. @Autowired
  80. private VideoStreamSessionManager streamSession;
  81. @Autowired
  82. private IDeviceService deviceService;
  83. @Autowired
  84. private UserSetting userSetting;
  85. @Autowired
  86. private DynamicTask dynamicTask;
  87. @Autowired
  88. private ZlmHttpHookSubscribe subscribe;
  89. @Autowired
  90. private SSRCFactory ssrcFactory;
  91. @Autowired
  92. private RedisTemplate<Object, Object> redisTemplate;
  93. @Override
  94. public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback) {
  95. if (mediaServerItem == null) {
  96. throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
  97. }
  98. Device device = redisCatchStorage.getDevice(deviceId);
  99. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  100. if (inviteInfo != null ) {
  101. if (inviteInfo.getStreamInfo() == null) {
  102. // 点播发起了但是尚未成功, 仅注册回调等待结果即可
  103. inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
  104. return inviteInfo.getSsrcInfo();
  105. }else {
  106. StreamInfo streamInfo = inviteInfo.getStreamInfo();
  107. String streamId = streamInfo.getStream();
  108. if (streamId == null) {
  109. callback.run(InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(), "点播失败, redis缓存streamId等于null", null);
  110. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  111. InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(),
  112. "点播失败, redis缓存streamId等于null",
  113. null);
  114. return inviteInfo.getSsrcInfo();
  115. }
  116. String mediaServerId = streamInfo.getMediaServerId();
  117. MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
  118. Boolean ready = zlmServerFactory.isStreamReady(mediaInfo, "rtp", streamId);
  119. if (ready != null && ready) {
  120. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  121. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  122. InviteErrorCode.SUCCESS.getCode(),
  123. InviteErrorCode.SUCCESS.getMsg(),
  124. streamInfo);
  125. return inviteInfo.getSsrcInfo();
  126. }else {
  127. // 点播发起了但是尚未成功, 仅注册回调等待结果即可
  128. inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
  129. storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
  130. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  131. }
  132. }
  133. }
  134. String streamId = null;
  135. if (mediaServerItem.isRtpEnable()) {
  136. streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  137. }
  138. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());
  139. if (ssrcInfo == null) {
  140. callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
  141. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  142. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(),
  143. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(),
  144. null);
  145. return null;
  146. }
  147. // TODO 记录点播的状态
  148. play(mediaServerItem, ssrcInfo, device, channelId, callback);
  149. return ssrcInfo;
  150. }
  151. @Override
  152. public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
  153. ErrorCallback<Object> callback) {
  154. if (mediaServerItem == null || ssrcInfo == null) {
  155. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  156. InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
  157. null);
  158. return;
  159. }
  160. logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
  161. device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(),
  162. device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  163. //端口获取失败的ssrcInfo 没有必要发送点播指令
  164. if (ssrcInfo.getPort() <= 0) {
  165. logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
  166. // 释放ssrc
  167. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  168. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  169. callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
  170. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  171. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
  172. return;
  173. }
  174. // 初始化redis中的invite消息状态
  175. InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
  176. mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
  177. InviteSessionStatus.ready);
  178. inviteInfo.setSubStream(device.isSwitchPrimarySubStream());
  179. inviteStreamService.updateInviteInfo(inviteInfo);
  180. // 超时处理
  181. String timeOutTaskKey = UUID.randomUUID().toString();
  182. dynamicTask.startDelay(timeOutTaskKey, () -> {
  183. // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
  184. InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  185. if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) {
  186. logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}",
  187. device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流",
  188. ssrcInfo.getPort(), ssrcInfo.getSsrc());
  189. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  190. // InviteInfo inviteInfoForTimeout = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.play, device.getDeviceId(), channelId);
  191. // if (inviteInfoForTimeout == null) {
  192. // return;
  193. // }
  194. // if (InviteSessionStatus.ok == inviteInfoForTimeout.getStatus() ) {
  195. // // TODO 发送bye
  196. // }else {
  197. // // TODO 发送cancel
  198. // }
  199. callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
  200. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  201. InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
  202. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  203. try {
  204. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
  205. } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
  206. logger.error("[点播超时], 发送BYE失败 {}", e.getMessage());
  207. } finally {
  208. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  209. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  210. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  211. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  212. // 取消订阅消息监听
  213. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  214. subscribe.removeSubscribe(hookSubscribe);
  215. }
  216. }
  217. }, userSetting.getPlayTimeout());
  218. try {
  219. cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInuse, hookParam ) -> {
  220. logger.info("收到订阅消息: " + hookParam);
  221. dynamicTask.stop(timeOutTaskKey);
  222. // hook响应
  223. StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channelId);
  224. if (streamInfo == null){
  225. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  226. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  227. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  228. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  229. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  230. return;
  231. }
  232. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  233. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  234. InviteErrorCode.SUCCESS.getCode(),
  235. InviteErrorCode.SUCCESS.getMsg(),
  236. streamInfo);
  237. logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(),
  238. device.isSwitchPrimarySubStream() ? "辅码流" : "主码流");
  239. String streamUrl;
  240. if (mediaServerItemInuse.getRtspPort() != 0) {
  241. streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", ssrcInfo.getStream());
  242. }else {
  243. streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServerItemInuse.getHttpPort(), "rtp", ssrcInfo.getStream());
  244. }
  245. String path = "snap";
  246. String fileName = device.getDeviceId() + "_" + channelId + ".jpg";
  247. // 请求截图
  248. logger.info("[请求截图]: " + fileName);
  249. zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
  250. }, (event) -> {
  251. inviteInfo.setStatus(InviteSessionStatus.ok);
  252. ResponseEvent responseEvent = (ResponseEvent) event.event;
  253. String contentString = new String(responseEvent.getResponse().getRawContent());
  254. // 获取ssrc
  255. int ssrcIndex = contentString.indexOf("y=");
  256. // 检查是否有y字段
  257. if (ssrcIndex >= 0) {
  258. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  259. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12).trim();
  260. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  261. if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
  262. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  263. String substring = contentString.substring(0, contentString.indexOf("y="));
  264. try {
  265. SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
  266. int port = -1;
  267. Vector mediaDescriptions = sdp.getMediaDescriptions(true);
  268. for (Object description : mediaDescriptions) {
  269. MediaDescription mediaDescription = (MediaDescription) description;
  270. Media media = mediaDescription.getMedia();
  271. Vector mediaFormats = media.getMediaFormats(false);
  272. if (mediaFormats.contains("96")) {
  273. port = media.getMediaPort();
  274. break;
  275. }
  276. }
  277. logger.info("[点播-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  278. JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
  279. logger.info("[点播-TCP主动连接对方] 结果: {}", jsonObject);
  280. } catch (SdpException e) {
  281. logger.error("[点播-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
  282. dynamicTask.stop(timeOutTaskKey);
  283. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  284. // 释放ssrc
  285. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  286. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  287. callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  288. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  289. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  290. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  291. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  292. }
  293. }
  294. return;
  295. }
  296. logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
  297. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  298. logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
  299. // 释放ssrc
  300. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  301. // 单端口模式streamId也有变化,重新设置监听即可
  302. if (!mediaServerItem.isRtpEnable()) {
  303. // 添加订阅
  304. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  305. subscribe.removeSubscribe(hookSubscribe);
  306. String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
  307. hookSubscribe.getContent().put("stream", stream);
  308. inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
  309. subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
  310. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam);
  311. dynamicTask.stop(timeOutTaskKey);
  312. // hook响应
  313. StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, hookParam, device.getDeviceId(), channelId);
  314. if (streamInfo == null){
  315. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  316. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  317. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  318. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  319. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  320. return;
  321. }
  322. callback.run(InviteErrorCode.SUCCESS.getCode(),
  323. InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  324. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  325. InviteErrorCode.SUCCESS.getCode(),
  326. InviteErrorCode.SUCCESS.getMsg(),
  327. streamInfo);
  328. });
  329. return;
  330. }
  331. // 更新ssrc
  332. Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
  333. if (!result) {
  334. try {
  335. logger.warn("[点播] 更新ssrc失败,停止点播 {}/{}", device.getDeviceId(), channelId);
  336. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
  337. } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
  338. logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
  339. }
  340. dynamicTask.stop(timeOutTaskKey);
  341. // 释放ssrc
  342. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  343. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  344. callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  345. "下级自定义了ssrc,重新设置收流信息失败", null);
  346. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  347. InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  348. "下级自定义了ssrc,重新设置收流信息失败", null);
  349. }else {
  350. if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) {
  351. inviteStreamService.removeInviteInfo(inviteInfo);
  352. }
  353. ssrcInfo.setSsrc(ssrcInResponse);
  354. inviteInfo.setSsrcInfo(ssrcInfo);
  355. inviteInfo.setStream(ssrcInfo.getStream());
  356. }
  357. }else {
  358. logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
  359. }
  360. }
  361. inviteStreamService.updateInviteInfo(inviteInfo);
  362. }, (event) -> {
  363. dynamicTask.stop(timeOutTaskKey);
  364. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  365. // 释放ssrc
  366. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  367. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  368. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
  369. String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  370. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  371. InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  372. String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  373. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  374. });
  375. } catch (InvalidArgumentException | SipException | ParseException e) {
  376. logger.error("[命令发送失败] 点播消息: {}", e.getMessage());
  377. dynamicTask.stop(timeOutTaskKey);
  378. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  379. // 释放ssrc
  380. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  381. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  382. callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
  383. InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
  384. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  385. InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
  386. InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
  387. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  388. }
  389. }
  390. private StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId) {
  391. StreamInfo streamInfo = null;
  392. Device device = redisCatchStorage.getDevice(deviceId);
  393. OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
  394. streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
  395. if (streamInfo != null) {
  396. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  397. if (deviceChannel != null) {
  398. deviceChannel.setStreamId(streamInfo.getStream());
  399. storager.startPlay(deviceId, channelId, streamInfo.getStream());
  400. }
  401. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  402. if (inviteInfo != null) {
  403. inviteInfo.setStatus(InviteSessionStatus.ok);
  404. inviteInfo.setStreamInfo(streamInfo);
  405. inviteStreamService.updateInviteInfo(inviteInfo);
  406. }
  407. }
  408. return streamInfo;
  409. }
  410. private StreamInfo onPublishHandlerForPlayback(MediaServerItem mediaServerItem, HookParam param, String deviceId, String channelId, String startTime, String endTime) {
  411. OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) param;
  412. StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
  413. if (streamInfo != null) {
  414. streamInfo.setStartTime(startTime);
  415. streamInfo.setEndTime(endTime);
  416. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  417. if (deviceChannel != null) {
  418. deviceChannel.setStreamId(streamInfo.getStream());
  419. storager.startPlay(deviceId, channelId, streamInfo.getStream());
  420. }
  421. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, deviceId, channelId);
  422. if (inviteInfo != null) {
  423. inviteInfo.setStatus(InviteSessionStatus.ok);
  424. inviteInfo.setStreamInfo(streamInfo);
  425. inviteStreamService.updateInviteInfo(inviteInfo);
  426. }
  427. }
  428. return streamInfo;
  429. }
  430. @Override
  431. public MediaServerItem getNewMediaServerItem(Device device) {
  432. if (device == null) {
  433. return null;
  434. }
  435. MediaServerItem mediaServerItem;
  436. if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
  437. mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
  438. } else {
  439. mediaServerItem = mediaServerService.getOne(device.getMediaServerId());
  440. }
  441. if (mediaServerItem == null) {
  442. logger.warn("点播时未找到可使用的ZLM...");
  443. }
  444. return mediaServerItem;
  445. }
  446. @Override
  447. public MediaServerItem getNewMediaServerItemHasAssist(Device device) {
  448. if (device == null) {
  449. return null;
  450. }
  451. MediaServerItem mediaServerItem;
  452. if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
  453. mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(true);
  454. } else {
  455. mediaServerItem = mediaServerService.getOne(device.getMediaServerId());
  456. }
  457. if (mediaServerItem == null) {
  458. logger.warn("[获取可用的ZLM节点]未找到可使用的ZLM...");
  459. }
  460. return mediaServerItem;
  461. }
  462. @Override
  463. public void playBack(String deviceId, String channelId, String startTime,
  464. String endTime, ErrorCallback<Object> callback) {
  465. Device device = storager.queryVideoDevice(deviceId);
  466. if (device == null) {
  467. return;
  468. }
  469. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  470. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
  471. playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback);
  472. }
  473. @Override
  474. public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
  475. String deviceId, String channelId, String startTime,
  476. String endTime, ErrorCallback<Object> callback) {
  477. if (mediaServerItem == null || ssrcInfo == null) {
  478. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  479. InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
  480. null);
  481. return;
  482. }
  483. Device device = storager.queryVideoDevice(deviceId);
  484. if (device == null) {
  485. throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备: " + deviceId + "不存在");
  486. }
  487. logger.info("[录像回放] deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
  488. device.getDeviceId(), channelId, startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(),
  489. ssrcInfo.getSsrc(), device.isSsrcCheck());
  490. // 初始化redis中的invite消息状态
  491. InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
  492. mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK,
  493. InviteSessionStatus.ready);
  494. inviteStreamService.updateInviteInfo(inviteInfo);
  495. String playBackTimeOutTaskKey = UUID.randomUUID().toString();
  496. dynamicTask.startDelay(playBackTimeOutTaskKey, () -> {
  497. logger.warn("[录像回放] 超时,deviceId:{} ,channelId:{}", deviceId, channelId);
  498. inviteStreamService.removeInviteInfo(inviteInfo);
  499. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
  500. try {
  501. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
  502. } catch (InvalidArgumentException | ParseException | SipException e) {
  503. logger.error("[录像回放] 超时 发送BYE失败 {}", e.getMessage());
  504. } catch (SsrcTransactionNotFoundException e) {
  505. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  506. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  507. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  508. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  509. }
  510. }, userSetting.getPlayTimeout());
  511. SipSubscribe.Event errorEvent = event -> {
  512. logger.info("[录像回放] 失败,{} {}", event.statusCode, event.msg);
  513. dynamicTask.stop(playBackTimeOutTaskKey);
  514. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
  515. String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  516. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  517. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  518. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  519. inviteStreamService.removeInviteInfo(inviteInfo);
  520. };
  521. ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
  522. logger.info("收到回放订阅消息: " + hookParam);
  523. dynamicTask.stop(playBackTimeOutTaskKey);
  524. StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
  525. if (streamInfo == null) {
  526. logger.warn("设备回放API调用失败!");
  527. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  528. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  529. return;
  530. }
  531. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  532. logger.info("[录像回放] 成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
  533. };
  534. try {
  535. cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime,
  536. hookEvent, eventResult -> {
  537. inviteInfo.setStatus(InviteSessionStatus.ok);
  538. ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
  539. String contentString = new String(responseEvent.getResponse().getRawContent());
  540. // 获取ssrc
  541. int ssrcIndex = contentString.indexOf("y=");
  542. // 检查是否有y字段
  543. if (ssrcIndex >= 0) {
  544. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  545. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
  546. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  547. if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
  548. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  549. String substring = contentString.substring(0, contentString.indexOf("y="));
  550. try {
  551. SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
  552. int port = -1;
  553. Vector mediaDescriptions = sdp.getMediaDescriptions(true);
  554. for (Object description : mediaDescriptions) {
  555. MediaDescription mediaDescription = (MediaDescription) description;
  556. Media media = mediaDescription.getMedia();
  557. Vector mediaFormats = media.getMediaFormats(false);
  558. if (mediaFormats.contains("96")) {
  559. port = media.getMediaPort();
  560. break;
  561. }
  562. }
  563. logger.info("[录像回放-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  564. JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
  565. logger.info("[录像回放-TCP主动连接对方] 结果: {}", jsonObject);
  566. } catch (SdpException e) {
  567. logger.error("[录像回放-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
  568. dynamicTask.stop(playBackTimeOutTaskKey);
  569. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  570. // 释放ssrc
  571. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  572. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  573. callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  574. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  575. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  576. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  577. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  578. }
  579. }
  580. return;
  581. }
  582. logger.info("[录像回放] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
  583. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  584. logger.info("[录像回放] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
  585. // 释放ssrc
  586. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  587. // 单端口模式streamId也有变化,需要重新设置监听
  588. if (!mediaServerItem.isRtpEnable()) {
  589. // 添加订阅
  590. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  591. subscribe.removeSubscribe(hookSubscribe);
  592. String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
  593. hookSubscribe.getContent().put("stream", stream);
  594. inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
  595. subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
  596. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam);
  597. dynamicTask.stop(playBackTimeOutTaskKey);
  598. // hook响应
  599. hookEvent.response(mediaServerItemInUse, hookParam);
  600. });
  601. }
  602. // 更新ssrc
  603. Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
  604. if (!result) {
  605. try {
  606. logger.warn("[录像回放] 更新ssrc失败,停止录像回放 {}/{}", device.getDeviceId(), channelId);
  607. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
  608. } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
  609. logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
  610. }
  611. dynamicTask.stop(playBackTimeOutTaskKey);
  612. // 释放ssrc
  613. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  614. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  615. callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  616. "下级自定义了ssrc,重新设置收流信息失败", null);
  617. }else {
  618. if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) {
  619. inviteStreamService.removeInviteInfo(inviteInfo);
  620. }
  621. ssrcInfo.setSsrc(ssrcInResponse);
  622. inviteInfo.setSsrcInfo(ssrcInfo);
  623. inviteInfo.setStream(ssrcInfo.getStream());
  624. }
  625. }else {
  626. logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
  627. }
  628. }
  629. inviteStreamService.updateInviteInfo(inviteInfo);
  630. }, errorEvent);
  631. } catch (InvalidArgumentException | SipException | ParseException e) {
  632. logger.error("[命令发送失败] 回放: {}", e.getMessage());
  633. SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
  634. eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
  635. eventResult.statusCode = -1;
  636. eventResult.msg = "命令发送失败";
  637. errorEvent.response(eventResult);
  638. }
  639. }
  640. @Override
  641. public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
  642. Device device = storager.queryVideoDevice(deviceId);
  643. if (device == null) {
  644. return;
  645. }
  646. MediaServerItem newMediaServerItem = getNewMediaServerItemHasAssist(device);
  647. if (newMediaServerItem == null) {
  648. callback.run(InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getCode(),
  649. InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getMsg(),
  650. null);
  651. return;
  652. }
  653. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
  654. download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, callback);
  655. }
  656. @Override
  657. public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
  658. if (mediaServerItem == null || ssrcInfo == null) {
  659. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  660. InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
  661. null);
  662. return;
  663. }
  664. Device device = storager.queryVideoDevice(deviceId);
  665. if (device == null) {
  666. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  667. "设备:" + deviceId + "不存在",
  668. null);
  669. return;
  670. }
  671. logger.info("[录像下载] deviceId: {}, channelId: {}, 下载速度:{}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, downloadSpeed, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  672. // 初始化redis中的invite消息状态
  673. InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
  674. mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.DOWNLOAD,
  675. InviteSessionStatus.ready);
  676. inviteStreamService.updateInviteInfo(inviteInfo);
  677. String downLoadTimeOutTaskKey = UUID.randomUUID().toString();
  678. dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> {
  679. logger.warn(String.format("录像下载请求超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  680. inviteStreamService.removeInviteInfo(inviteInfo);
  681. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
  682. InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
  683. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  684. try {
  685. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
  686. } catch (InvalidArgumentException | ParseException | SipException e) {
  687. logger.error("[录像流]录像下载请求超时, 发送BYE失败 {}", e.getMessage());
  688. } catch (SsrcTransactionNotFoundException e) {
  689. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  690. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  691. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  692. }
  693. }, userSetting.getPlayTimeout());
  694. SipSubscribe.Event errorEvent = event -> {
  695. dynamicTask.stop(downLoadTimeOutTaskKey);
  696. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
  697. String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  698. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  699. inviteStreamService.removeInviteInfo(inviteInfo);
  700. };
  701. ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
  702. logger.info("[录像下载]收到订阅消息: " + hookParam);
  703. dynamicTask.stop(downLoadTimeOutTaskKey);
  704. StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
  705. if (streamInfo == null) {
  706. logger.warn("[录像下载] 获取流地址信息失败");
  707. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  708. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  709. return;
  710. }
  711. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  712. logger.info("[录像下载] 调用成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
  713. };
  714. try {
  715. cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed,
  716. hookEvent, errorEvent, eventResult ->{
  717. inviteInfo.setStatus(InviteSessionStatus.ok);
  718. ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
  719. String contentString = new String(responseEvent.getResponse().getRawContent());
  720. // 获取ssrc
  721. int ssrcIndex = contentString.indexOf("y=");
  722. // 检查是否有y字段
  723. if (ssrcIndex >= 0) {
  724. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  725. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
  726. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  727. if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
  728. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  729. String substring = contentString.substring(0, contentString.indexOf("y="));
  730. try {
  731. SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
  732. int port = -1;
  733. Vector mediaDescriptions = sdp.getMediaDescriptions(true);
  734. for (Object description : mediaDescriptions) {
  735. MediaDescription mediaDescription = (MediaDescription) description;
  736. Media media = mediaDescription.getMedia();
  737. Vector mediaFormats = media.getMediaFormats(false);
  738. if (mediaFormats.contains("96")) {
  739. port = media.getMediaPort();
  740. break;
  741. }
  742. }
  743. logger.info("[录像下载-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  744. JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
  745. logger.info("[录像下载-TCP主动连接对方] 结果: {}", jsonObject);
  746. } catch (SdpException e) {
  747. logger.error("[录像下载-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
  748. dynamicTask.stop(downLoadTimeOutTaskKey);
  749. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  750. // 释放ssrc
  751. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  752. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  753. callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  754. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  755. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  756. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  757. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  758. }
  759. }
  760. return;
  761. }
  762. logger.info("[录像下载] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
  763. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  764. logger.info("[录像下载] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
  765. // 释放ssrc
  766. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  767. // 单端口模式streamId也有变化,需要重新设置监听
  768. if (!mediaServerItem.isRtpEnable()) {
  769. // 添加订阅
  770. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  771. subscribe.removeSubscribe(hookSubscribe);
  772. String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
  773. hookSubscribe.getContent().put("stream", stream);
  774. inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
  775. subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
  776. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam);
  777. dynamicTask.stop(downLoadTimeOutTaskKey);
  778. hookEvent.response(mediaServerItemInUse, hookParam);
  779. });
  780. }
  781. // 更新ssrc
  782. Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
  783. if (!result) {
  784. try {
  785. logger.warn("[录像下载] 更新ssrc失败,停止录像回放 {}/{}", device.getDeviceId(), channelId);
  786. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
  787. } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
  788. logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
  789. }
  790. dynamicTask.stop(downLoadTimeOutTaskKey);
  791. // 释放ssrc
  792. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  793. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  794. callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  795. "下级自定义了ssrc,重新设置收流信息失败", null);
  796. }else {
  797. if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) {
  798. inviteStreamService.removeInviteInfo(inviteInfo);
  799. }
  800. ssrcInfo.setSsrc(ssrcInResponse);
  801. inviteInfo.setSsrcInfo(ssrcInfo);
  802. inviteInfo.setStream(ssrcInfo.getStream());
  803. }
  804. }else {
  805. logger.info("[录像下载] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
  806. }
  807. }
  808. inviteStreamService.updateInviteInfo(inviteInfo);
  809. });
  810. } catch (InvalidArgumentException | SipException | ParseException e) {
  811. logger.error("[命令发送失败] 录像下载: {}", e.getMessage());
  812. SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
  813. eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
  814. eventResult.statusCode = -1;
  815. eventResult.msg = "命令发送失败";
  816. errorEvent.response(eventResult);
  817. }
  818. }
  819. @Override
  820. public StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream) {
  821. InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream);
  822. if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
  823. if (inviteInfo.getStreamInfo().getProgress() == 1) {
  824. return inviteInfo.getStreamInfo();
  825. }
  826. // 获取当前已下载时长
  827. String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId();
  828. MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  829. if (mediaServerItem == null) {
  830. logger.warn("查询录像信息时发现节点已离线");
  831. return null;
  832. }
  833. if (mediaServerItem.getRecordAssistPort() > 0) {
  834. JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream(), null);
  835. if (jsonObject == null) {
  836. throw new ControllerException(ErrorCode.ERROR100.getCode(), "连接Assist服务失败");
  837. }
  838. if (jsonObject.getInteger("code") == 0) {
  839. long duration = jsonObject.getLong("data");
  840. if (duration == 0) {
  841. inviteInfo.getStreamInfo().setProgress(0);
  842. } else {
  843. String startTime = inviteInfo.getStreamInfo().getStartTime();
  844. String endTime = inviteInfo.getStreamInfo().getEndTime();
  845. long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
  846. long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
  847. BigDecimal currentCount = new BigDecimal(duration / 1000);
  848. BigDecimal totalCount = new BigDecimal(end - start);
  849. BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
  850. double process = divide.doubleValue();
  851. inviteInfo.getStreamInfo().setProgress(process);
  852. }
  853. inviteStreamService.updateInviteInfo(inviteInfo);
  854. }
  855. }
  856. return inviteInfo.getStreamInfo();
  857. }
  858. return null;
  859. }
  860. private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, HookParam hookParam, String deviceId, String channelId, String startTime, String endTime) {
  861. OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam;
  862. StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, streamChangedHookParam, deviceId, channelId);
  863. if (streamInfo != null) {
  864. streamInfo.setProgress(0);
  865. streamInfo.setStartTime(startTime);
  866. streamInfo.setEndTime(endTime);
  867. InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, streamInfo.getStream());
  868. if (inviteInfo != null) {
  869. logger.info("[录像下载] 更新invite消息中的stream信息");
  870. inviteInfo.setStatus(InviteSessionStatus.ok);
  871. inviteInfo.setStreamInfo(streamInfo);
  872. inviteStreamService.updateInviteInfo(inviteInfo);
  873. }
  874. }
  875. return streamInfo;
  876. }
  877. public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) {
  878. StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), hookParam.getTracks(), null);
  879. streamInfo.setDeviceID(deviceId);
  880. streamInfo.setChannelId(channelId);
  881. return streamInfo;
  882. }
  883. @Override
  884. public void zlmServerOffline(String mediaServerId) {
  885. // 处理正在向上推流的上级平台
  886. List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
  887. if (sendRtpItems.size() > 0) {
  888. for (SendRtpItem sendRtpItem : sendRtpItems) {
  889. if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
  890. ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
  891. try {
  892. sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
  893. } catch (SipException | InvalidArgumentException | ParseException e) {
  894. logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
  895. }
  896. }
  897. }
  898. }
  899. // 处理正在观看的国标设备
  900. List<SsrcTransaction> allSsrc = streamSession.getAllSsrc();
  901. if (allSsrc.size() > 0) {
  902. for (SsrcTransaction ssrcTransaction : allSsrc) {
  903. if (ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
  904. Device device = deviceService.getDevice(ssrcTransaction.getDeviceId());
  905. if (device == null) {
  906. continue;
  907. }
  908. try {
  909. cmder.streamByeCmd(device, ssrcTransaction.getChannelId(),
  910. ssrcTransaction.getStream(), null);
  911. } catch (InvalidArgumentException | ParseException | SipException |
  912. SsrcTransactionNotFoundException e) {
  913. logger.error("[zlm离线]为正在使用此zlm的设备, 发送BYE失败 {}", e.getMessage());
  914. }
  915. }
  916. }
  917. }
  918. }
  919. @Override
  920. public void zlmServerOnline(String mediaServerId) {
  921. // TODO 查找之前的点播,流如果不存在则给下级发送bye
  922. // MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  923. // zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
  924. // Integer code = mediaList.getInteger("code");
  925. // if (code == 0) {
  926. // JSONArray data = mediaList.getJSONArray("data");
  927. // if (data == null || data.size() == 0) {
  928. // zlmServerOffline(mediaServerId);
  929. // }else {
  930. // Map<String, JSONObject> mediaListMap = new HashMap<>();
  931. // for (int i = 0; i < data.size(); i++) {
  932. // JSONObject json = data.getJSONObject(i);
  933. // String app = json.getString("app");
  934. // if ("rtp".equals(app)) {
  935. // String stream = json.getString("stream");
  936. // if (mediaListMap.get(stream) != null) {
  937. // continue;
  938. // }
  939. // mediaListMap.put(stream, json);
  940. // // 处理正在观看的国标设备
  941. // List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(null, null, null, stream);
  942. // if (ssrcTransactions.size() > 0) {
  943. // for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
  944. // if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
  945. // cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(),
  946. // ssrcTransaction.getStream(), null);
  947. // }
  948. // }
  949. // }
  950. // }
  951. // }
  952. // if (mediaListMap.size() > 0 ) {
  953. // // 处理正在向上推流的上级平台
  954. // List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
  955. // if (sendRtpItems.size() > 0) {
  956. // for (SendRtpItem sendRtpItem : sendRtpItems) {
  957. // if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
  958. // if (mediaListMap.get(sendRtpItem.getStreamId()) == null) {
  959. // ParentPlatform platform = storager.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
  960. // sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
  961. // }
  962. // }
  963. // }
  964. // }
  965. // }
  966. // }
  967. // }
  968. // }));
  969. }
  970. @Override
  971. public void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {
  972. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
  973. if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
  974. logger.warn("streamId不存在!");
  975. throw new ServiceException("streamId不存在");
  976. }
  977. inviteInfo.getStreamInfo().setPause(true);
  978. inviteStreamService.updateInviteInfo(inviteInfo);
  979. MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
  980. if (null == mediaServerItem) {
  981. logger.warn("mediaServer 不存在!");
  982. throw new ServiceException("mediaServer不存在");
  983. }
  984. // zlm 暂停RTP超时检查
  985. JSONObject jsonObject = zlmresTfulUtils.pauseRtpCheck(mediaServerItem, streamId);
  986. if (jsonObject == null || jsonObject.getInteger("code") != 0) {
  987. throw new ServiceException("暂停RTP接收失败");
  988. }
  989. Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
  990. cmder.playPauseCmd(device, inviteInfo.getStreamInfo());
  991. }
  992. @Override
  993. public void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {
  994. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
  995. if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
  996. logger.warn("streamId不存在!");
  997. throw new ServiceException("streamId不存在");
  998. }
  999. inviteInfo.getStreamInfo().setPause(false);
  1000. inviteStreamService.updateInviteInfo(inviteInfo);
  1001. MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
  1002. if (null == mediaServerItem) {
  1003. logger.warn("mediaServer 不存在!");
  1004. throw new ServiceException("mediaServer不存在");
  1005. }
  1006. // zlm 暂停RTP超时检查
  1007. JSONObject jsonObject = zlmresTfulUtils.resumeRtpCheck(mediaServerItem, streamId);
  1008. if (jsonObject == null || jsonObject.getInteger("code") != 0) {
  1009. throw new ServiceException("继续RTP接收失败");
  1010. }
  1011. Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
  1012. cmder.playResumeCmd(device, inviteInfo.getStreamInfo());
  1013. }
  1014. @Override
  1015. public void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback) {
  1016. Device device = deviceService.getDevice(deviceId);
  1017. if (device == null) {
  1018. errorCallback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), null);
  1019. return;
  1020. }
  1021. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  1022. if (inviteInfo != null) {
  1023. if (inviteInfo.getStreamInfo() != null) {
  1024. // 已存在线直接截图
  1025. MediaServerItem mediaServerItemInuse = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
  1026. String streamUrl;
  1027. if (mediaServerItemInuse.getRtspPort() != 0) {
  1028. streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", inviteInfo.getStreamInfo().getStream());
  1029. }else {
  1030. streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServerItemInuse.getHttpPort(), "rtp", inviteInfo.getStreamInfo().getStream());
  1031. }
  1032. String path = "snap";
  1033. // 请求截图
  1034. logger.info("[请求截图]: " + fileName);
  1035. zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
  1036. File snapFile = new File(path + File.separator + fileName);
  1037. if (snapFile.exists()) {
  1038. errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), snapFile.getAbsoluteFile());
  1039. }else {
  1040. errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
  1041. }
  1042. return;
  1043. }
  1044. }
  1045. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  1046. play(newMediaServerItem, deviceId, channelId, null, (code, msg, data)->{
  1047. if (code == InviteErrorCode.SUCCESS.getCode()) {
  1048. InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  1049. if (inviteInfoForPlay != null && inviteInfoForPlay.getStreamInfo() != null) {
  1050. getSnap(deviceId, channelId, fileName, errorCallback);
  1051. }else {
  1052. errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
  1053. }
  1054. }else {
  1055. errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
  1056. }
  1057. });
  1058. }
  1059. }