PlayServiceImpl.java 68 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson2.JSONArray;
  3. import com.alibaba.fastjson2.JSONObject;
  4. import com.genersoft.iot.vmp.common.InviteInfo;
  5. import com.genersoft.iot.vmp.common.InviteSessionStatus;
  6. import com.genersoft.iot.vmp.common.InviteSessionType;
  7. import com.genersoft.iot.vmp.common.StreamInfo;
  8. import com.genersoft.iot.vmp.conf.DynamicTask;
  9. import com.genersoft.iot.vmp.conf.UserSetting;
  10. import com.genersoft.iot.vmp.conf.exception.ControllerException;
  11. import com.genersoft.iot.vmp.conf.exception.ServiceException;
  12. import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
  13. import com.genersoft.iot.vmp.gb28181.bean.*;
  14. import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
  15. import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
  16. import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  17. import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
  18. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
  19. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
  20. import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
  21. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  22. import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
  23. import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
  24. import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
  25. import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
  26. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  27. import com.genersoft.iot.vmp.service.*;
  28. import com.genersoft.iot.vmp.service.bean.ErrorCallback;
  29. import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
  30. import com.genersoft.iot.vmp.service.bean.SSRCInfo;
  31. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  32. import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  33. import com.genersoft.iot.vmp.utils.DateUtil;
  34. import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
  35. import org.slf4j.Logger;
  36. import org.slf4j.LoggerFactory;
  37. import org.springframework.beans.factory.annotation.Autowired;
  38. import org.springframework.data.redis.core.RedisTemplate;
  39. import org.springframework.stereotype.Service;
  40. import org.springframework.util.ObjectUtils;
  41. import javax.sdp.*;
  42. import javax.sip.InvalidArgumentException;
  43. import javax.sip.ResponseEvent;
  44. import javax.sip.SipException;
  45. import java.io.File;
  46. import java.math.BigDecimal;
  47. import java.math.RoundingMode;
  48. import java.text.ParseException;
  49. import java.util.List;
  50. import java.util.UUID;
  51. import java.util.Vector;
  52. @SuppressWarnings(value = {"rawtypes", "unchecked"})
  53. @Service
  54. public class PlayServiceImpl implements IPlayService {
  55. private final static Logger logger = LoggerFactory.getLogger(PlayServiceImpl.class);
  56. @Autowired
  57. private IVideoManagerStorage storager;
  58. @Autowired
  59. private SIPCommander cmder;
  60. @Autowired
  61. private SIPCommanderFroPlatform sipCommanderFroPlatform;
  62. @Autowired
  63. private IRedisCatchStorage redisCatchStorage;
  64. @Autowired
  65. private IInviteStreamService inviteStreamService;
  66. @Autowired
  67. private DeferredResultHolder resultHolder;
  68. @Autowired
  69. private ZLMRESTfulUtils zlmresTfulUtils;
  70. @Autowired
  71. private ZLMRTPServerFactory zlmrtpServerFactory;
  72. @Autowired
  73. private AssistRESTfulUtils assistRESTfulUtils;
  74. @Autowired
  75. private IMediaService mediaService;
  76. @Autowired
  77. private IMediaServerService mediaServerService;
  78. @Autowired
  79. private VideoStreamSessionManager streamSession;
  80. @Autowired
  81. private IDeviceService deviceService;
  82. @Autowired
  83. private UserSetting userSetting;
  84. @Autowired
  85. private DynamicTask dynamicTask;
  86. @Autowired
  87. private ZlmHttpHookSubscribe subscribe;
  88. @Autowired
  89. private SSRCFactory ssrcFactory;
  90. @Autowired
  91. private RedisTemplate<Object, Object> redisTemplate;
  92. @Override
  93. public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, ErrorCallback<Object> callback) {
  94. if (mediaServerItem == null) {
  95. throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
  96. }
  97. Device device = redisCatchStorage.getDevice(deviceId);
  98. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  99. if (inviteInfo != null ) {
  100. if (inviteInfo.getStreamInfo() == null) {
  101. // 点播发起了但是尚未成功, 仅注册回调等待结果即可
  102. inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
  103. return inviteInfo.getSsrcInfo();
  104. }else {
  105. StreamInfo streamInfo = inviteInfo.getStreamInfo();
  106. String streamId = streamInfo.getStream();
  107. if (streamId == null) {
  108. callback.run(InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(), "点播失败, redis缓存streamId等于null", null);
  109. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  110. InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(),
  111. "点播失败, redis缓存streamId等于null",
  112. null);
  113. return inviteInfo.getSsrcInfo();
  114. }
  115. String mediaServerId = streamInfo.getMediaServerId();
  116. MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
  117. Boolean ready = zlmrtpServerFactory.isStreamReady(mediaInfo, "rtp", streamId);
  118. if (ready != null && ready) {
  119. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  120. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  121. InviteErrorCode.SUCCESS.getCode(),
  122. InviteErrorCode.SUCCESS.getMsg(),
  123. streamInfo);
  124. return inviteInfo.getSsrcInfo();
  125. }else {
  126. // 点播发起了但是尚未成功, 仅注册回调等待结果即可
  127. inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
  128. storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
  129. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  130. }
  131. }
  132. }
  133. String streamId = null;
  134. if (mediaServerItem.isRtpEnable()) {
  135. streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  136. }
  137. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());
  138. if (ssrcInfo == null) {
  139. callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
  140. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  141. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(),
  142. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(),
  143. null);
  144. return null;
  145. }
  146. // TODO 记录点播的状态
  147. play(mediaServerItem, ssrcInfo, device, channelId, callback);
  148. return ssrcInfo;
  149. }
  150. @Override
  151. public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
  152. ErrorCallback<Object> callback) {
  153. if (mediaServerItem == null || ssrcInfo == null) {
  154. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  155. InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
  156. null);
  157. return;
  158. }
  159. logger.info("\r\n" +
  160. " [点播开始] \r\n" +
  161. "deviceId : {}, \r\n" +
  162. "channelId : {},\r\n" +
  163. "收流端口 : {}, \r\n" +
  164. "收流模式 : {}, \r\n" +
  165. "SSRC : {}, \r\n" +
  166. "SSRC校验 :{}",
  167. device.getDeviceId(),
  168. channelId,
  169. ssrcInfo.getPort(),
  170. device.getStreamMode(),
  171. ssrcInfo.getSsrc(),
  172. device.isSsrcCheck());
  173. //端口获取失败的ssrcInfo 没有必要发送点播指令
  174. if (ssrcInfo.getPort() <= 0) {
  175. logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
  176. // 释放ssrc
  177. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  178. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  179. callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
  180. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  181. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
  182. return;
  183. }
  184. // 初始化redis中的invite消息状态
  185. InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
  186. mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
  187. InviteSessionStatus.ready);
  188. inviteStreamService.updateInviteInfo(inviteInfo);
  189. // 超时处理
  190. String timeOutTaskKey = UUID.randomUUID().toString();
  191. dynamicTask.startDelay(timeOutTaskKey, () -> {
  192. // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
  193. InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  194. if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) {
  195. logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
  196. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  197. // InviteInfo inviteInfoForTimeout = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.play, device.getDeviceId(), channelId);
  198. // if (inviteInfoForTimeout == null) {
  199. // return;
  200. // }
  201. // if (InviteSessionStatus.ok == inviteInfoForTimeout.getStatus() ) {
  202. // // TODO 发送bye
  203. // }else {
  204. // // TODO 发送cancel
  205. // }
  206. callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
  207. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  208. InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
  209. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  210. try {
  211. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
  212. } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
  213. logger.error("[点播超时], 发送BYE失败 {}", e.getMessage());
  214. } finally {
  215. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  216. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  217. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  218. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  219. // 取消订阅消息监听
  220. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  221. subscribe.removeSubscribe(hookSubscribe);
  222. }
  223. }
  224. }, userSetting.getPlayTimeout());
  225. try {
  226. cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
  227. logger.info("收到订阅消息: " + response.toJSONString());
  228. dynamicTask.stop(timeOutTaskKey);
  229. // hook响应
  230. StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId);
  231. if (streamInfo == null){
  232. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  233. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  234. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  235. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  236. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  237. return;
  238. }
  239. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  240. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  241. InviteErrorCode.SUCCESS.getCode(),
  242. InviteErrorCode.SUCCESS.getMsg(),
  243. streamInfo);
  244. logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
  245. String streamUrl;
  246. if (mediaServerItemInuse.getRtspPort() != 0) {
  247. streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", ssrcInfo.getStream());
  248. }else {
  249. streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServerItemInuse.getHttpPort(), "rtp", ssrcInfo.getStream());
  250. }
  251. String path = "snap";
  252. String fileName = device.getDeviceId() + "_" + channelId + ".jpg";
  253. // 请求截图
  254. logger.info("[请求截图]: " + fileName);
  255. zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
  256. }, (event) -> {
  257. inviteInfo.setStatus(InviteSessionStatus.ok);
  258. ResponseEvent responseEvent = (ResponseEvent) event.event;
  259. String contentString = new String(responseEvent.getResponse().getRawContent());
  260. // 获取ssrc
  261. int ssrcIndex = contentString.indexOf("y=");
  262. // 检查是否有y字段
  263. if (ssrcIndex >= 0) {
  264. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  265. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12).trim();
  266. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  267. if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
  268. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  269. String substring = contentString.substring(0, contentString.indexOf("y="));
  270. try {
  271. SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
  272. int port = -1;
  273. Vector mediaDescriptions = sdp.getMediaDescriptions(true);
  274. for (Object description : mediaDescriptions) {
  275. MediaDescription mediaDescription = (MediaDescription) description;
  276. Media media = mediaDescription.getMedia();
  277. Vector mediaFormats = media.getMediaFormats(false);
  278. if (mediaFormats.contains("96")) {
  279. port = media.getMediaPort();
  280. break;
  281. }
  282. }
  283. logger.info("[点播-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  284. JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
  285. logger.info("[点播-TCP主动连接对方] 结果: {}", jsonObject);
  286. } catch (SdpException e) {
  287. logger.error("[点播-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
  288. dynamicTask.stop(timeOutTaskKey);
  289. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  290. // 释放ssrc
  291. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  292. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  293. callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  294. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  295. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  296. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  297. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  298. }
  299. }
  300. return;
  301. }
  302. logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
  303. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  304. logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
  305. // 释放ssrc
  306. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  307. // 单端口模式streamId也有变化,重新设置监听即可
  308. if (!mediaServerItem.isRtpEnable()) {
  309. // 添加订阅
  310. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  311. subscribe.removeSubscribe(hookSubscribe);
  312. String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
  313. hookSubscribe.getContent().put("stream", stream);
  314. inviteInfo.setStream(stream);
  315. subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
  316. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
  317. dynamicTask.stop(timeOutTaskKey);
  318. // hook响应
  319. StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId);
  320. if (streamInfo == null){
  321. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  322. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  323. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  324. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  325. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  326. return;
  327. }
  328. callback.run(InviteErrorCode.SUCCESS.getCode(),
  329. InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  330. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  331. InviteErrorCode.SUCCESS.getCode(),
  332. InviteErrorCode.SUCCESS.getMsg(),
  333. streamInfo);
  334. });
  335. return;
  336. }
  337. // 更新ssrc
  338. Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
  339. if (!result) {
  340. try {
  341. logger.warn("[点播] 更新ssrc失败,停止点播 {}/{}", device.getDeviceId(), channelId);
  342. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
  343. } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
  344. logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
  345. }
  346. dynamicTask.stop(timeOutTaskKey);
  347. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  348. callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  349. "下级自定义了ssrc,重新设置收流信息失败", null);
  350. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  351. InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  352. "下级自定义了ssrc,重新设置收流信息失败", null);
  353. }else {
  354. ssrcInfo.setSsrc(ssrcInResponse);
  355. inviteInfo.setSsrcInfo(ssrcInfo);
  356. inviteInfo.setStream(ssrcInfo.getStream());
  357. }
  358. }else {
  359. logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
  360. }
  361. }
  362. inviteStreamService.updateInviteInfo(inviteInfo);
  363. }, (event) -> {
  364. dynamicTask.stop(timeOutTaskKey);
  365. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  366. // 释放ssrc
  367. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  368. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  369. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
  370. String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  371. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  372. InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  373. String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  374. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  375. });
  376. } catch (InvalidArgumentException | SipException | ParseException e) {
  377. logger.error("[命令发送失败] 点播消息: {}", e.getMessage());
  378. dynamicTask.stop(timeOutTaskKey);
  379. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  380. // 释放ssrc
  381. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  382. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  383. callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
  384. InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
  385. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  386. InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
  387. InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
  388. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  389. }
  390. }
  391. private StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) {
  392. StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
  393. if (streamInfo != null) {
  394. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  395. if (deviceChannel != null) {
  396. deviceChannel.setStreamId(streamInfo.getStream());
  397. storager.startPlay(deviceId, channelId, streamInfo.getStream());
  398. }
  399. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  400. if (inviteInfo != null) {
  401. inviteInfo.setStatus(InviteSessionStatus.ok);
  402. inviteInfo.setStreamInfo(streamInfo);
  403. inviteStreamService.updateInviteInfo(inviteInfo);
  404. }
  405. }
  406. return streamInfo;
  407. }
  408. private StreamInfo onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String startTime, String endTime) {
  409. StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
  410. if (streamInfo != null) {
  411. streamInfo.setStartTime(startTime);
  412. streamInfo.setEndTime(endTime);
  413. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  414. if (deviceChannel != null) {
  415. deviceChannel.setStreamId(streamInfo.getStream());
  416. storager.startPlay(deviceId, channelId, streamInfo.getStream());
  417. }
  418. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, deviceId, channelId);
  419. if (inviteInfo != null) {
  420. inviteInfo.setStatus(InviteSessionStatus.ok);
  421. inviteInfo.setStreamInfo(streamInfo);
  422. inviteStreamService.updateInviteInfo(inviteInfo);
  423. }
  424. }
  425. return streamInfo;
  426. }
  427. @Override
  428. public MediaServerItem getNewMediaServerItem(Device device) {
  429. if (device == null) {
  430. return null;
  431. }
  432. MediaServerItem mediaServerItem;
  433. if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
  434. mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
  435. } else {
  436. mediaServerItem = mediaServerService.getOne(device.getMediaServerId());
  437. }
  438. if (mediaServerItem == null) {
  439. logger.warn("点播时未找到可使用的ZLM...");
  440. }
  441. return mediaServerItem;
  442. }
  443. @Override
  444. public MediaServerItem getNewMediaServerItemHasAssist(Device device) {
  445. if (device == null) {
  446. return null;
  447. }
  448. MediaServerItem mediaServerItem;
  449. if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
  450. mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(true);
  451. } else {
  452. mediaServerItem = mediaServerService.getOne(device.getMediaServerId());
  453. }
  454. if (mediaServerItem == null) {
  455. logger.warn("[获取可用的ZLM节点]未找到可使用的ZLM...");
  456. }
  457. return mediaServerItem;
  458. }
  459. @Override
  460. public void playBack(String deviceId, String channelId, String startTime,
  461. String endTime, ErrorCallback<Object> callback) {
  462. Device device = storager.queryVideoDevice(deviceId);
  463. if (device == null) {
  464. return;
  465. }
  466. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  467. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
  468. playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback);
  469. }
  470. @Override
  471. public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
  472. String deviceId, String channelId, String startTime,
  473. String endTime, ErrorCallback<Object> callback) {
  474. if (mediaServerItem == null || ssrcInfo == null) {
  475. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  476. InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
  477. null);
  478. return;
  479. }
  480. Device device = storager.queryVideoDevice(deviceId);
  481. if (device == null) {
  482. throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备: " + deviceId + "不存在");
  483. }
  484. logger.info("[录像回放] deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
  485. device.getDeviceId(), channelId, startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(),
  486. ssrcInfo.getSsrc(), device.isSsrcCheck());
  487. // 初始化redis中的invite消息状态
  488. InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
  489. mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK,
  490. InviteSessionStatus.ready);
  491. inviteStreamService.updateInviteInfo(inviteInfo);
  492. String playBackTimeOutTaskKey = UUID.randomUUID().toString();
  493. dynamicTask.startDelay(playBackTimeOutTaskKey, () -> {
  494. logger.warn("[录像回放] 超时,deviceId:{} ,channelId:{}", deviceId, channelId);
  495. inviteStreamService.removeInviteInfo(inviteInfo);
  496. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
  497. try {
  498. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
  499. } catch (InvalidArgumentException | ParseException | SipException e) {
  500. logger.error("[录像回放] 超时 发送BYE失败 {}", e.getMessage());
  501. } catch (SsrcTransactionNotFoundException e) {
  502. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  503. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  504. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  505. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  506. }
  507. }, userSetting.getPlayTimeout());
  508. SipSubscribe.Event errorEvent = event -> {
  509. logger.info("[录像回放] 失败,{} {}", event.statusCode, event.msg);
  510. dynamicTask.stop(playBackTimeOutTaskKey);
  511. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
  512. String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  513. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  514. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  515. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  516. inviteStreamService.removeInviteInfo(inviteInfo);
  517. };
  518. ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, jsonObject) -> {
  519. logger.info("收到回放订阅消息: " + jsonObject);
  520. dynamicTask.stop(playBackTimeOutTaskKey);
  521. StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, jsonObject, deviceId, channelId, startTime, endTime);
  522. if (streamInfo == null) {
  523. logger.warn("设备回放API调用失败!");
  524. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  525. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  526. return;
  527. }
  528. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  529. logger.info("[录像回放] 成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
  530. };
  531. try {
  532. cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime,
  533. hookEvent, eventResult -> {
  534. inviteInfo.setStatus(InviteSessionStatus.ok);
  535. ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
  536. String contentString = new String(responseEvent.getResponse().getRawContent());
  537. // 获取ssrc
  538. int ssrcIndex = contentString.indexOf("y=");
  539. // 检查是否有y字段
  540. if (ssrcIndex >= 0) {
  541. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  542. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
  543. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  544. if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
  545. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  546. String substring = contentString.substring(0, contentString.indexOf("y="));
  547. try {
  548. SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
  549. int port = -1;
  550. Vector mediaDescriptions = sdp.getMediaDescriptions(true);
  551. for (Object description : mediaDescriptions) {
  552. MediaDescription mediaDescription = (MediaDescription) description;
  553. Media media = mediaDescription.getMedia();
  554. Vector mediaFormats = media.getMediaFormats(false);
  555. if (mediaFormats.contains("96")) {
  556. port = media.getMediaPort();
  557. break;
  558. }
  559. }
  560. logger.info("[录像回放-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  561. JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
  562. logger.info("[录像回放-TCP主动连接对方] 结果: {}", jsonObject);
  563. } catch (SdpException e) {
  564. logger.error("[录像回放-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
  565. dynamicTask.stop(playBackTimeOutTaskKey);
  566. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  567. // 释放ssrc
  568. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  569. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  570. callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  571. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  572. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  573. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  574. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  575. }
  576. }
  577. return;
  578. }
  579. logger.info("[录像回放] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
  580. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  581. logger.info("[录像回放] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
  582. // 释放ssrc
  583. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  584. // 单端口模式streamId也有变化,需要重新设置监听
  585. if (!mediaServerItem.isRtpEnable()) {
  586. // 添加订阅
  587. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  588. subscribe.removeSubscribe(hookSubscribe);
  589. String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
  590. hookSubscribe.getContent().put("stream", stream);
  591. inviteInfo.setStream(stream);
  592. subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
  593. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
  594. dynamicTask.stop(playBackTimeOutTaskKey);
  595. // hook响应
  596. hookEvent.response(mediaServerItemInUse, response);
  597. });
  598. }
  599. // 更新ssrc
  600. Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
  601. if (!result) {
  602. try {
  603. logger.warn("[录像回放] 更新ssrc失败,停止录像回放 {}/{}", device.getDeviceId(), channelId);
  604. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
  605. } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
  606. logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
  607. }
  608. dynamicTask.stop(playBackTimeOutTaskKey);
  609. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  610. callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  611. "下级自定义了ssrc,重新设置收流信息失败", null);
  612. }else {
  613. ssrcInfo.setSsrc(ssrcInResponse);
  614. inviteInfo.setSsrcInfo(ssrcInfo);
  615. inviteInfo.setStream(ssrcInfo.getStream());
  616. }
  617. }else {
  618. logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
  619. }
  620. }
  621. inviteStreamService.updateInviteInfo(inviteInfo);
  622. }, errorEvent);
  623. } catch (InvalidArgumentException | SipException | ParseException e) {
  624. logger.error("[命令发送失败] 回放: {}", e.getMessage());
  625. SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
  626. eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
  627. eventResult.statusCode = -1;
  628. eventResult.msg = "命令发送失败";
  629. errorEvent.response(eventResult);
  630. }
  631. }
  632. @Override
  633. public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
  634. Device device = storager.queryVideoDevice(deviceId);
  635. if (device == null) {
  636. return;
  637. }
  638. MediaServerItem newMediaServerItem = getNewMediaServerItemHasAssist(device);
  639. if (newMediaServerItem == null) {
  640. callback.run(InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getCode(),
  641. InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getMsg(),
  642. null);
  643. return;
  644. }
  645. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
  646. download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, callback);
  647. }
  648. @Override
  649. public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
  650. if (mediaServerItem == null || ssrcInfo == null) {
  651. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  652. InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
  653. null);
  654. return;
  655. }
  656. Device device = storager.queryVideoDevice(deviceId);
  657. if (device == null) {
  658. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  659. "设备:" + deviceId + "不存在",
  660. null);
  661. return;
  662. }
  663. logger.info("[录像下载] deviceId: {}, channelId: {}, 下载速度:{}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, downloadSpeed, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  664. // 初始化redis中的invite消息状态
  665. InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
  666. mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.DOWNLOAD,
  667. InviteSessionStatus.ready);
  668. inviteStreamService.updateInviteInfo(inviteInfo);
  669. String downLoadTimeOutTaskKey = UUID.randomUUID().toString();
  670. dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> {
  671. logger.warn(String.format("录像下载请求超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  672. inviteStreamService.removeInviteInfo(inviteInfo);
  673. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
  674. InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
  675. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  676. try {
  677. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
  678. } catch (InvalidArgumentException | ParseException | SipException e) {
  679. logger.error("[录像流]录像下载请求超时, 发送BYE失败 {}", e.getMessage());
  680. } catch (SsrcTransactionNotFoundException e) {
  681. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  682. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  683. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  684. }
  685. }, userSetting.getPlayTimeout());
  686. SipSubscribe.Event errorEvent = event -> {
  687. dynamicTask.stop(downLoadTimeOutTaskKey);
  688. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
  689. String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  690. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  691. inviteStreamService.removeInviteInfo(inviteInfo);
  692. };
  693. ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, jsonObject) -> {
  694. logger.info("[录像下载]收到订阅消息: " + jsonObject);
  695. dynamicTask.stop(downLoadTimeOutTaskKey);
  696. StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, jsonObject, deviceId, channelId, startTime, endTime);
  697. if (streamInfo == null) {
  698. logger.warn("[录像下载] 获取流地址信息失败");
  699. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  700. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  701. return;
  702. }
  703. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  704. logger.info("[录像下载] 调用成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
  705. };
  706. try {
  707. cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed,
  708. hookEvent, errorEvent, eventResult ->{
  709. inviteInfo.setStatus(InviteSessionStatus.ok);
  710. ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
  711. String contentString = new String(responseEvent.getResponse().getRawContent());
  712. // 获取ssrc
  713. int ssrcIndex = contentString.indexOf("y=");
  714. // 检查是否有y字段
  715. if (ssrcIndex >= 0) {
  716. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  717. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
  718. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  719. if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
  720. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  721. String substring = contentString.substring(0, contentString.indexOf("y="));
  722. try {
  723. SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
  724. int port = -1;
  725. Vector mediaDescriptions = sdp.getMediaDescriptions(true);
  726. for (Object description : mediaDescriptions) {
  727. MediaDescription mediaDescription = (MediaDescription) description;
  728. Media media = mediaDescription.getMedia();
  729. Vector mediaFormats = media.getMediaFormats(false);
  730. if (mediaFormats.contains("96")) {
  731. port = media.getMediaPort();
  732. break;
  733. }
  734. }
  735. logger.info("[录像下载-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  736. JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
  737. logger.info("[录像下载-TCP主动连接对方] 结果: {}", jsonObject);
  738. } catch (SdpException e) {
  739. logger.error("[录像下载-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
  740. dynamicTask.stop(downLoadTimeOutTaskKey);
  741. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  742. // 释放ssrc
  743. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  744. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  745. callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  746. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  747. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  748. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  749. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  750. }
  751. }
  752. return;
  753. }
  754. logger.info("[录像下载] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
  755. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  756. logger.info("[录像下载] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
  757. // 释放ssrc
  758. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  759. // 单端口模式streamId也有变化,需要重新设置监听
  760. if (!mediaServerItem.isRtpEnable()) {
  761. // 添加订阅
  762. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  763. subscribe.removeSubscribe(hookSubscribe);
  764. hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
  765. subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
  766. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
  767. dynamicTask.stop(downLoadTimeOutTaskKey);
  768. hookEvent.response(mediaServerItemInUse, response);
  769. });
  770. }
  771. // 更新ssrc
  772. Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
  773. if (!result) {
  774. try {
  775. logger.warn("[录像下载] 更新ssrc失败,停止录像回放 {}/{}", device.getDeviceId(), channelId);
  776. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
  777. } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
  778. logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
  779. }
  780. dynamicTask.stop(downLoadTimeOutTaskKey);
  781. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  782. callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  783. "下级自定义了ssrc,重新设置收流信息失败", null);
  784. }else {
  785. ssrcInfo.setSsrc(ssrcInResponse);
  786. inviteInfo.setSsrcInfo(ssrcInfo);
  787. inviteInfo.setStream(ssrcInfo.getStream());
  788. }
  789. }else {
  790. logger.info("[录像下载] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
  791. }
  792. }
  793. });
  794. } catch (InvalidArgumentException | SipException | ParseException e) {
  795. logger.error("[命令发送失败] 录像下载: {}", e.getMessage());
  796. SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
  797. eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
  798. eventResult.statusCode = -1;
  799. eventResult.msg = "命令发送失败";
  800. errorEvent.response(eventResult);
  801. }
  802. }
  803. @Override
  804. public StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream) {
  805. InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream);
  806. if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
  807. if (inviteInfo.getStreamInfo().getProgress() == 1) {
  808. return inviteInfo.getStreamInfo();
  809. }
  810. // 获取当前已下载时长
  811. String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId();
  812. MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  813. if (mediaServerItem == null) {
  814. logger.warn("查询录像信息时发现节点已离线");
  815. return null;
  816. }
  817. if (mediaServerItem.getRecordAssistPort() > 0) {
  818. JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream(), null);
  819. if (jsonObject == null) {
  820. throw new ControllerException(ErrorCode.ERROR100.getCode(), "连接Assist服务失败");
  821. }
  822. if (jsonObject.getInteger("code") == 0) {
  823. long duration = jsonObject.getLong("data");
  824. if (duration == 0) {
  825. inviteInfo.getStreamInfo().setProgress(0);
  826. } else {
  827. String startTime = inviteInfo.getStreamInfo().getStartTime();
  828. String endTime = inviteInfo.getStreamInfo().getEndTime();
  829. long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
  830. long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
  831. BigDecimal currentCount = new BigDecimal(duration / 1000);
  832. BigDecimal totalCount = new BigDecimal(end - start);
  833. BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
  834. double process = divide.doubleValue();
  835. inviteInfo.getStreamInfo().setProgress(process);
  836. }
  837. inviteStreamService.updateInviteInfo(inviteInfo);
  838. }
  839. }
  840. return inviteInfo.getStreamInfo();
  841. }
  842. return null;
  843. }
  844. private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, JSONObject response, String deviceId, String channelId, String startTime, String endTime) {
  845. StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, response, deviceId, channelId);
  846. if (streamInfo != null) {
  847. streamInfo.setProgress(0);
  848. streamInfo.setStartTime(startTime);
  849. streamInfo.setEndTime(endTime);
  850. InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, streamInfo.getStream());
  851. if (inviteInfo != null) {
  852. logger.info("[录像下载] 更新invite消息中的stream信息");
  853. inviteInfo.setStatus(InviteSessionStatus.ok);
  854. inviteInfo.setStreamInfo(streamInfo);
  855. inviteStreamService.updateInviteInfo(inviteInfo);
  856. }
  857. }
  858. return streamInfo;
  859. }
  860. public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {
  861. String streamId = resonse.getString("stream");
  862. JSONArray tracks = resonse.getJSONArray("tracks");
  863. StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", streamId, tracks, null);
  864. streamInfo.setDeviceID(deviceId);
  865. streamInfo.setChannelId(channelId);
  866. return streamInfo;
  867. }
  868. @Override
  869. public void zlmServerOffline(String mediaServerId) {
  870. // 处理正在向上推流的上级平台
  871. List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
  872. if (sendRtpItems.size() > 0) {
  873. for (SendRtpItem sendRtpItem : sendRtpItems) {
  874. if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
  875. ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
  876. try {
  877. sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
  878. } catch (SipException | InvalidArgumentException | ParseException e) {
  879. logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
  880. }
  881. }
  882. }
  883. }
  884. // 处理正在观看的国标设备
  885. List<SsrcTransaction> allSsrc = streamSession.getAllSsrc();
  886. if (allSsrc.size() > 0) {
  887. for (SsrcTransaction ssrcTransaction : allSsrc) {
  888. if (ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
  889. Device device = deviceService.getDevice(ssrcTransaction.getDeviceId());
  890. if (device == null) {
  891. continue;
  892. }
  893. try {
  894. cmder.streamByeCmd(device, ssrcTransaction.getChannelId(),
  895. ssrcTransaction.getStream(), null);
  896. } catch (InvalidArgumentException | ParseException | SipException |
  897. SsrcTransactionNotFoundException e) {
  898. logger.error("[zlm离线]为正在使用此zlm的设备, 发送BYE失败 {}", e.getMessage());
  899. }
  900. }
  901. }
  902. }
  903. }
  904. @Override
  905. public void zlmServerOnline(String mediaServerId) {
  906. // TODO 查找之前的点播,流如果不存在则给下级发送bye
  907. // MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  908. // zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
  909. // Integer code = mediaList.getInteger("code");
  910. // if (code == 0) {
  911. // JSONArray data = mediaList.getJSONArray("data");
  912. // if (data == null || data.size() == 0) {
  913. // zlmServerOffline(mediaServerId);
  914. // }else {
  915. // Map<String, JSONObject> mediaListMap = new HashMap<>();
  916. // for (int i = 0; i < data.size(); i++) {
  917. // JSONObject json = data.getJSONObject(i);
  918. // String app = json.getString("app");
  919. // if ("rtp".equals(app)) {
  920. // String stream = json.getString("stream");
  921. // if (mediaListMap.get(stream) != null) {
  922. // continue;
  923. // }
  924. // mediaListMap.put(stream, json);
  925. // // 处理正在观看的国标设备
  926. // List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(null, null, null, stream);
  927. // if (ssrcTransactions.size() > 0) {
  928. // for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
  929. // if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
  930. // cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(),
  931. // ssrcTransaction.getStream(), null);
  932. // }
  933. // }
  934. // }
  935. // }
  936. // }
  937. // if (mediaListMap.size() > 0 ) {
  938. // // 处理正在向上推流的上级平台
  939. // List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
  940. // if (sendRtpItems.size() > 0) {
  941. // for (SendRtpItem sendRtpItem : sendRtpItems) {
  942. // if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
  943. // if (mediaListMap.get(sendRtpItem.getStreamId()) == null) {
  944. // ParentPlatform platform = storager.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
  945. // sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
  946. // }
  947. // }
  948. // }
  949. // }
  950. // }
  951. // }
  952. // }
  953. // }));
  954. }
  955. @Override
  956. public void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {
  957. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
  958. if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
  959. logger.warn("streamId不存在!");
  960. throw new ServiceException("streamId不存在");
  961. }
  962. inviteInfo.getStreamInfo().setPause(true);
  963. inviteStreamService.updateInviteInfo(inviteInfo);
  964. MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
  965. if (null == mediaServerItem) {
  966. logger.warn("mediaServer 不存在!");
  967. throw new ServiceException("mediaServer不存在");
  968. }
  969. // zlm 暂停RTP超时检查
  970. JSONObject jsonObject = zlmresTfulUtils.pauseRtpCheck(mediaServerItem, streamId);
  971. if (jsonObject == null || jsonObject.getInteger("code") != 0) {
  972. throw new ServiceException("暂停RTP接收失败");
  973. }
  974. Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
  975. cmder.playPauseCmd(device, inviteInfo.getStreamInfo());
  976. }
  977. @Override
  978. public void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {
  979. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
  980. if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
  981. logger.warn("streamId不存在!");
  982. throw new ServiceException("streamId不存在");
  983. }
  984. inviteInfo.getStreamInfo().setPause(false);
  985. inviteStreamService.updateInviteInfo(inviteInfo);
  986. MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
  987. if (null == mediaServerItem) {
  988. logger.warn("mediaServer 不存在!");
  989. throw new ServiceException("mediaServer不存在");
  990. }
  991. // zlm 暂停RTP超时检查
  992. JSONObject jsonObject = zlmresTfulUtils.resumeRtpCheck(mediaServerItem, streamId);
  993. if (jsonObject == null || jsonObject.getInteger("code") != 0) {
  994. throw new ServiceException("继续RTP接收失败");
  995. }
  996. Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
  997. cmder.playResumeCmd(device, inviteInfo.getStreamInfo());
  998. }
  999. @Override
  1000. public void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback) {
  1001. Device device = deviceService.getDevice(deviceId);
  1002. if (device == null) {
  1003. errorCallback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), null);
  1004. return;
  1005. }
  1006. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  1007. if (inviteInfo != null) {
  1008. if (inviteInfo.getStreamInfo() != null) {
  1009. // 已存在线直接截图
  1010. MediaServerItem mediaServerItemInuse = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
  1011. String streamUrl;
  1012. if (mediaServerItemInuse.getRtspPort() != 0) {
  1013. streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", inviteInfo.getStreamInfo().getStream());
  1014. }else {
  1015. streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServerItemInuse.getHttpPort(), "rtp", inviteInfo.getStreamInfo().getStream());
  1016. }
  1017. String path = "snap";
  1018. // 请求截图
  1019. logger.info("[请求截图]: " + fileName);
  1020. zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
  1021. String filePath = path + File.separator + fileName;
  1022. File snapFile = new File(path + File.separator + fileName);
  1023. if (snapFile.exists()) {
  1024. errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), filePath);
  1025. }else {
  1026. errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
  1027. }
  1028. return;
  1029. }
  1030. }
  1031. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  1032. play(newMediaServerItem, deviceId, channelId, (code, msg, data)->{
  1033. if (code == InviteErrorCode.SUCCESS.getCode()) {
  1034. InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  1035. if (inviteInfoForPlay != null && inviteInfoForPlay.getStreamInfo() != null) {
  1036. getSnap(deviceId, channelId, fileName, errorCallback);
  1037. }else {
  1038. errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
  1039. }
  1040. }else {
  1041. errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
  1042. }
  1043. });
  1044. }
  1045. }