PlayServiceImpl.java 66 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson2.JSONObject;
  3. import com.genersoft.iot.vmp.common.InviteInfo;
  4. import com.genersoft.iot.vmp.common.InviteSessionStatus;
  5. import com.genersoft.iot.vmp.common.InviteSessionType;
  6. import com.genersoft.iot.vmp.common.StreamInfo;
  7. import com.genersoft.iot.vmp.conf.DynamicTask;
  8. import com.genersoft.iot.vmp.conf.UserSetting;
  9. import com.genersoft.iot.vmp.conf.exception.ControllerException;
  10. import com.genersoft.iot.vmp.conf.exception.ServiceException;
  11. import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
  12. import com.genersoft.iot.vmp.gb28181.bean.*;
  13. import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
  14. import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
  15. import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  16. import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
  17. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
  18. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
  19. import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
  20. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  21. import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
  22. import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
  23. import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
  24. import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
  25. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  26. import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
  27. import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
  28. import com.genersoft.iot.vmp.service.*;
  29. import com.genersoft.iot.vmp.service.bean.ErrorCallback;
  30. import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
  31. import com.genersoft.iot.vmp.service.bean.SSRCInfo;
  32. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  33. import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  34. import com.genersoft.iot.vmp.utils.DateUtil;
  35. import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
  36. import org.slf4j.Logger;
  37. import org.slf4j.LoggerFactory;
  38. import org.springframework.beans.factory.annotation.Autowired;
  39. import org.springframework.data.redis.core.RedisTemplate;
  40. import org.springframework.stereotype.Service;
  41. import org.springframework.util.ObjectUtils;
  42. import javax.sdp.*;
  43. import javax.sip.InvalidArgumentException;
  44. import javax.sip.ResponseEvent;
  45. import javax.sip.SipException;
  46. import java.io.File;
  47. import java.math.BigDecimal;
  48. import java.math.RoundingMode;
  49. import java.text.ParseException;
  50. import java.util.List;
  51. import java.util.UUID;
  52. import java.util.Vector;
  53. @SuppressWarnings(value = {"rawtypes", "unchecked"})
  54. @Service
  55. public class PlayServiceImpl implements IPlayService {
  56. private final static Logger logger = LoggerFactory.getLogger(PlayServiceImpl.class);
  57. @Autowired
  58. private IVideoManagerStorage storager;
  59. @Autowired
  60. private SIPCommander cmder;
  61. @Autowired
  62. private SIPCommanderFroPlatform sipCommanderFroPlatform;
  63. @Autowired
  64. private IRedisCatchStorage redisCatchStorage;
  65. @Autowired
  66. private IInviteStreamService inviteStreamService;
  67. @Autowired
  68. private DeferredResultHolder resultHolder;
  69. @Autowired
  70. private ZLMRESTfulUtils zlmresTfulUtils;
  71. @Autowired
  72. private ZLMServerFactory zlmServerFactory;
  73. @Autowired
  74. private AssistRESTfulUtils assistRESTfulUtils;
  75. @Autowired
  76. private IMediaService mediaService;
  77. @Autowired
  78. private IMediaServerService mediaServerService;
  79. @Autowired
  80. private VideoStreamSessionManager streamSession;
  81. @Autowired
  82. private IDeviceService deviceService;
  83. @Autowired
  84. private UserSetting userSetting;
  85. @Autowired
  86. private DynamicTask dynamicTask;
  87. @Autowired
  88. private ZlmHttpHookSubscribe subscribe;
  89. @Autowired
  90. private SSRCFactory ssrcFactory;
  91. @Autowired
  92. private RedisTemplate<Object, Object> redisTemplate;
  93. @Override
  94. public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback) {
  95. if (mediaServerItem == null) {
  96. throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
  97. }
  98. Device device = redisCatchStorage.getDevice(deviceId);
  99. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  100. if (inviteInfo != null ) {
  101. if (inviteInfo.getStreamInfo() == null) {
  102. // 点播发起了但是尚未成功, 仅注册回调等待结果即可
  103. inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
  104. return inviteInfo.getSsrcInfo();
  105. }else {
  106. StreamInfo streamInfo = inviteInfo.getStreamInfo();
  107. String streamId = streamInfo.getStream();
  108. if (streamId == null) {
  109. callback.run(InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(), "点播失败, redis缓存streamId等于null", null);
  110. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  111. InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(),
  112. "点播失败, redis缓存streamId等于null",
  113. null);
  114. return inviteInfo.getSsrcInfo();
  115. }
  116. String mediaServerId = streamInfo.getMediaServerId();
  117. MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
  118. Boolean ready = zlmServerFactory.isStreamReady(mediaInfo, "rtp", streamId);
  119. if (ready != null && ready) {
  120. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  121. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  122. InviteErrorCode.SUCCESS.getCode(),
  123. InviteErrorCode.SUCCESS.getMsg(),
  124. streamInfo);
  125. return inviteInfo.getSsrcInfo();
  126. }else {
  127. // 点播发起了但是尚未成功, 仅注册回调等待结果即可
  128. inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
  129. storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
  130. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  131. }
  132. }
  133. }
  134. String streamId = null;
  135. if (mediaServerItem.isRtpEnable()) {
  136. streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  137. }
  138. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());
  139. if (ssrcInfo == null) {
  140. callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
  141. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  142. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(),
  143. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(),
  144. null);
  145. return null;
  146. }
  147. // TODO 记录点播的状态
  148. play(mediaServerItem, ssrcInfo, device, channelId, callback);
  149. return ssrcInfo;
  150. }
  151. @Override
  152. public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
  153. ErrorCallback<Object> callback) {
  154. if (mediaServerItem == null || ssrcInfo == null) {
  155. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  156. InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
  157. null);
  158. return;
  159. }
  160. logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
  161. device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(),
  162. device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  163. //端口获取失败的ssrcInfo 没有必要发送点播指令
  164. if (ssrcInfo.getPort() <= 0) {
  165. logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
  166. // 释放ssrc
  167. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  168. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  169. callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
  170. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  171. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
  172. return;
  173. }
  174. // 初始化redis中的invite消息状态
  175. InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
  176. mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
  177. InviteSessionStatus.ready);
  178. inviteInfo.setSubStream(device.isSwitchPrimarySubStream());
  179. inviteStreamService.updateInviteInfo(inviteInfo);
  180. // 超时处理
  181. String timeOutTaskKey = UUID.randomUUID().toString();
  182. dynamicTask.startDelay(timeOutTaskKey, () -> {
  183. // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
  184. InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  185. if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) {
  186. logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}",
  187. device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流",
  188. ssrcInfo.getPort(), ssrcInfo.getSsrc());
  189. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  190. // InviteInfo inviteInfoForTimeout = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.play, device.getDeviceId(), channelId);
  191. // if (inviteInfoForTimeout == null) {
  192. // return;
  193. // }
  194. // if (InviteSessionStatus.ok == inviteInfoForTimeout.getStatus() ) {
  195. // // TODO 发送bye
  196. // }else {
  197. // // TODO 发送cancel
  198. // }
  199. callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
  200. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  201. InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
  202. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  203. try {
  204. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
  205. } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
  206. logger.error("[点播超时], 发送BYE失败 {}", e.getMessage());
  207. } finally {
  208. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  209. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  210. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  211. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  212. // 取消订阅消息监听
  213. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  214. subscribe.removeSubscribe(hookSubscribe);
  215. }
  216. }
  217. }, userSetting.getPlayTimeout());
  218. try {
  219. cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInuse, hookParam ) -> {
  220. logger.info("收到订阅消息: " + hookParam);
  221. dynamicTask.stop(timeOutTaskKey);
  222. // hook响应
  223. StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channelId);
  224. if (streamInfo == null){
  225. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  226. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  227. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  228. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  229. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  230. return;
  231. }
  232. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  233. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  234. InviteErrorCode.SUCCESS.getCode(),
  235. InviteErrorCode.SUCCESS.getMsg(),
  236. streamInfo);
  237. logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channelId,
  238. device.isSwitchPrimarySubStream() ? "辅码流" : "主码流");
  239. snapOnPlay(mediaServerItemInuse, device.getDeviceId(), channelId, ssrcInfo.getStream());
  240. }, (event) -> {
  241. inviteInfo.setStatus(InviteSessionStatus.ok);
  242. ResponseEvent responseEvent = (ResponseEvent) event.event;
  243. String contentString = new String(responseEvent.getResponse().getRawContent());
  244. // 获取ssrc
  245. int ssrcIndex = contentString.indexOf("y=");
  246. // 检查是否有y字段
  247. if (ssrcIndex >= 0) {
  248. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  249. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12).trim();
  250. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  251. if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
  252. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  253. tcpActiveHandler(device, channelId, contentString, mediaServerItem, timeOutTaskKey, ssrcInfo, callback);
  254. }
  255. return;
  256. }
  257. logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
  258. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  259. logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
  260. // 释放ssrc
  261. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  262. // 单端口模式streamId也有变化,重新设置监听即可
  263. if (!mediaServerItem.isRtpEnable()) {
  264. // 添加订阅
  265. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  266. subscribe.removeSubscribe(hookSubscribe);
  267. String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
  268. hookSubscribe.getContent().put("stream", stream);
  269. inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
  270. subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
  271. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam);
  272. dynamicTask.stop(timeOutTaskKey);
  273. // hook响应
  274. StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, hookParam, device.getDeviceId(), channelId);
  275. if (streamInfo == null){
  276. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  277. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  278. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  279. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  280. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  281. return;
  282. }
  283. callback.run(InviteErrorCode.SUCCESS.getCode(),
  284. InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  285. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  286. InviteErrorCode.SUCCESS.getCode(),
  287. InviteErrorCode.SUCCESS.getMsg(),
  288. streamInfo);
  289. snapOnPlay(mediaServerItemInUse, device.getDeviceId(), channelId, stream);
  290. });
  291. return;
  292. }
  293. // 更新ssrc
  294. Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
  295. if (!result) {
  296. try {
  297. logger.warn("[点播] 更新ssrc失败,停止点播 {}/{}", device.getDeviceId(), channelId);
  298. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
  299. } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
  300. logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
  301. }
  302. dynamicTask.stop(timeOutTaskKey);
  303. // 释放ssrc
  304. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  305. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  306. callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  307. "下级自定义了ssrc,重新设置收流信息失败", null);
  308. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  309. InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  310. "下级自定义了ssrc,重新设置收流信息失败", null);
  311. }else {
  312. if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) {
  313. inviteStreamService.removeInviteInfo(inviteInfo);
  314. }
  315. ssrcInfo.setSsrc(ssrcInResponse);
  316. inviteInfo.setSsrcInfo(ssrcInfo);
  317. inviteInfo.setStream(ssrcInfo.getStream());
  318. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  319. tcpActiveHandler(device, channelId, contentString, mediaServerItem, timeOutTaskKey, ssrcInfo, callback);
  320. }
  321. }
  322. }else {
  323. logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
  324. }
  325. }
  326. inviteStreamService.updateInviteInfo(inviteInfo);
  327. }, (event) -> {
  328. dynamicTask.stop(timeOutTaskKey);
  329. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  330. // 释放ssrc
  331. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  332. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  333. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
  334. String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  335. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  336. InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  337. String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  338. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  339. });
  340. } catch (InvalidArgumentException | SipException | ParseException e) {
  341. logger.error("[命令发送失败] 点播消息: {}", e.getMessage());
  342. dynamicTask.stop(timeOutTaskKey);
  343. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  344. // 释放ssrc
  345. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  346. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  347. callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
  348. InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
  349. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  350. InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
  351. InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
  352. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  353. }
  354. }
  355. private void tcpActiveHandler(Device device, String channelId, String contentString,
  356. MediaServerItem mediaServerItem,
  357. String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback<Object> callback){
  358. if (!device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  359. return;
  360. }
  361. String substring = contentString.substring(0, contentString.indexOf("y="));
  362. try {
  363. SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
  364. int port = -1;
  365. Vector mediaDescriptions = sdp.getMediaDescriptions(true);
  366. for (Object description : mediaDescriptions) {
  367. MediaDescription mediaDescription = (MediaDescription) description;
  368. Media media = mediaDescription.getMedia();
  369. Vector mediaFormats = media.getMediaFormats(false);
  370. if (mediaFormats.contains("96")) {
  371. port = media.getMediaPort();
  372. break;
  373. }
  374. }
  375. logger.info("[TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  376. JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
  377. logger.info("[TCP主动连接对方] 结果: {}", jsonObject);
  378. } catch (SdpException e) {
  379. logger.error("[TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
  380. dynamicTask.stop(timeOutTaskKey);
  381. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  382. // 释放ssrc
  383. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  384. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  385. callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  386. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  387. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  388. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  389. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  390. }
  391. }
  392. /**
  393. * 点播成功时调用截图.
  394. *
  395. * @param mediaServerItemInuse media
  396. * @param deviceId 设备 ID
  397. * @param channelId 通道 ID
  398. * @param stream ssrc
  399. */
  400. private void snapOnPlay(MediaServerItem mediaServerItemInuse, String deviceId, String channelId, String stream) {
  401. String streamUrl;
  402. if (mediaServerItemInuse.getRtspPort() != 0) {
  403. streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", stream);
  404. } else {
  405. streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServerItemInuse.getHttpPort(), "rtp", stream);
  406. }
  407. String path = "snap";
  408. String fileName = deviceId + "_" + channelId + ".jpg";
  409. // 请求截图
  410. logger.info("[请求截图]: " + fileName);
  411. zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
  412. }
  413. private StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId) {
  414. StreamInfo streamInfo = null;
  415. Device device = redisCatchStorage.getDevice(deviceId);
  416. OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
  417. streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
  418. if (streamInfo != null) {
  419. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  420. if (deviceChannel != null) {
  421. deviceChannel.setStreamId(streamInfo.getStream());
  422. storager.startPlay(deviceId, channelId, streamInfo.getStream());
  423. }
  424. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  425. if (inviteInfo != null) {
  426. inviteInfo.setStatus(InviteSessionStatus.ok);
  427. inviteInfo.setStreamInfo(streamInfo);
  428. inviteStreamService.updateInviteInfo(inviteInfo);
  429. }
  430. }
  431. return streamInfo;
  432. }
  433. private StreamInfo onPublishHandlerForPlayback(MediaServerItem mediaServerItem, HookParam param, String deviceId, String channelId, String startTime, String endTime) {
  434. OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) param;
  435. StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
  436. if (streamInfo != null) {
  437. streamInfo.setStartTime(startTime);
  438. streamInfo.setEndTime(endTime);
  439. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  440. if (deviceChannel != null) {
  441. deviceChannel.setStreamId(streamInfo.getStream());
  442. storager.startPlay(deviceId, channelId, streamInfo.getStream());
  443. }
  444. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, deviceId, channelId);
  445. if (inviteInfo != null) {
  446. inviteInfo.setStatus(InviteSessionStatus.ok);
  447. inviteInfo.setStreamInfo(streamInfo);
  448. inviteStreamService.updateInviteInfo(inviteInfo);
  449. }
  450. }
  451. return streamInfo;
  452. }
  453. @Override
  454. public MediaServerItem getNewMediaServerItem(Device device) {
  455. if (device == null) {
  456. return null;
  457. }
  458. MediaServerItem mediaServerItem;
  459. if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
  460. mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
  461. } else {
  462. mediaServerItem = mediaServerService.getOne(device.getMediaServerId());
  463. }
  464. if (mediaServerItem == null) {
  465. logger.warn("点播时未找到可使用的ZLM...");
  466. }
  467. return mediaServerItem;
  468. }
  469. @Override
  470. public MediaServerItem getNewMediaServerItemHasAssist(Device device) {
  471. if (device == null) {
  472. return null;
  473. }
  474. MediaServerItem mediaServerItem;
  475. if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
  476. mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(true);
  477. } else {
  478. mediaServerItem = mediaServerService.getOne(device.getMediaServerId());
  479. }
  480. if (mediaServerItem == null) {
  481. logger.warn("[获取可用的ZLM节点]未找到可使用的ZLM...");
  482. }
  483. return mediaServerItem;
  484. }
  485. @Override
  486. public void playBack(String deviceId, String channelId, String startTime,
  487. String endTime, ErrorCallback<Object> callback) {
  488. Device device = storager.queryVideoDevice(deviceId);
  489. if (device == null) {
  490. return;
  491. }
  492. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  493. String stream = null;
  494. if (newMediaServerItem.isRtpEnable()) {
  495. String startTimeStr = startTime.replace("-", "")
  496. .replace(":", "")
  497. .replace(" ", "");
  498. System.out.println(startTimeStr);
  499. String endTimeTimeStr = endTime.replace("-", "")
  500. .replace(":", "")
  501. .replace(" ", "");
  502. System.out.println(endTimeTimeStr);
  503. stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr;
  504. }
  505. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
  506. playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback);
  507. }
  508. @Override
  509. public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
  510. String deviceId, String channelId, String startTime,
  511. String endTime, ErrorCallback<Object> callback) {
  512. if (mediaServerItem == null || ssrcInfo == null) {
  513. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  514. InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
  515. null);
  516. return;
  517. }
  518. Device device = storager.queryVideoDevice(deviceId);
  519. if (device == null) {
  520. throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备: " + deviceId + "不存在");
  521. }
  522. logger.info("[录像回放] deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
  523. device.getDeviceId(), channelId, startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(),
  524. ssrcInfo.getSsrc(), device.isSsrcCheck());
  525. // 初始化redis中的invite消息状态
  526. InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
  527. mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK,
  528. InviteSessionStatus.ready);
  529. inviteStreamService.updateInviteInfo(inviteInfo);
  530. String playBackTimeOutTaskKey = UUID.randomUUID().toString();
  531. dynamicTask.startDelay(playBackTimeOutTaskKey, () -> {
  532. logger.warn("[录像回放] 超时,deviceId:{} ,channelId:{}", deviceId, channelId);
  533. inviteStreamService.removeInviteInfo(inviteInfo);
  534. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
  535. try {
  536. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
  537. } catch (InvalidArgumentException | ParseException | SipException e) {
  538. logger.error("[录像回放] 超时 发送BYE失败 {}", e.getMessage());
  539. } catch (SsrcTransactionNotFoundException e) {
  540. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  541. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  542. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  543. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  544. }
  545. }, userSetting.getPlayTimeout());
  546. SipSubscribe.Event errorEvent = event -> {
  547. logger.info("[录像回放] 失败,{} {}", event.statusCode, event.msg);
  548. dynamicTask.stop(playBackTimeOutTaskKey);
  549. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
  550. String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  551. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  552. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  553. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  554. inviteStreamService.removeInviteInfo(inviteInfo);
  555. };
  556. ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
  557. logger.info("收到回放订阅消息: " + hookParam);
  558. dynamicTask.stop(playBackTimeOutTaskKey);
  559. StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
  560. if (streamInfo == null) {
  561. logger.warn("设备回放API调用失败!");
  562. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  563. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  564. return;
  565. }
  566. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  567. logger.info("[录像回放] 成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
  568. };
  569. try {
  570. cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime,
  571. hookEvent, eventResult -> {
  572. inviteInfo.setStatus(InviteSessionStatus.ok);
  573. ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
  574. String contentString = new String(responseEvent.getResponse().getRawContent());
  575. // 获取ssrc
  576. int ssrcIndex = contentString.indexOf("y=");
  577. // 检查是否有y字段
  578. if (ssrcIndex >= 0) {
  579. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  580. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
  581. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  582. if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
  583. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  584. tcpActiveHandler(device, channelId, contentString, mediaServerItem, playBackTimeOutTaskKey, ssrcInfo, callback);
  585. }
  586. return;
  587. }
  588. logger.info("[录像回放] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
  589. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  590. logger.info("[录像回放] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
  591. // 释放ssrc
  592. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  593. // 单端口模式streamId也有变化,需要重新设置监听
  594. if (!mediaServerItem.isRtpEnable()) {
  595. // 添加订阅
  596. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  597. subscribe.removeSubscribe(hookSubscribe);
  598. String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
  599. hookSubscribe.getContent().put("stream", stream);
  600. inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
  601. subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
  602. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam);
  603. dynamicTask.stop(playBackTimeOutTaskKey);
  604. // hook响应
  605. hookEvent.response(mediaServerItemInUse, hookParam);
  606. });
  607. }
  608. // 更新ssrc
  609. Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
  610. if (!result) {
  611. try {
  612. logger.warn("[录像回放] 更新ssrc失败,停止录像回放 {}/{}", device.getDeviceId(), channelId);
  613. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
  614. } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
  615. logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
  616. }
  617. dynamicTask.stop(playBackTimeOutTaskKey);
  618. // 释放ssrc
  619. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  620. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  621. callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  622. "下级自定义了ssrc,重新设置收流信息失败", null);
  623. }else {
  624. if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) {
  625. inviteStreamService.removeInviteInfo(inviteInfo);
  626. }
  627. ssrcInfo.setSsrc(ssrcInResponse);
  628. inviteInfo.setSsrcInfo(ssrcInfo);
  629. inviteInfo.setStream(ssrcInfo.getStream());
  630. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  631. tcpActiveHandler(device, channelId, contentString, mediaServerItem, playBackTimeOutTaskKey, ssrcInfo, callback);
  632. }
  633. }
  634. }else {
  635. logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
  636. }
  637. }
  638. inviteStreamService.updateInviteInfo(inviteInfo);
  639. }, errorEvent);
  640. } catch (InvalidArgumentException | SipException | ParseException e) {
  641. logger.error("[命令发送失败] 回放: {}", e.getMessage());
  642. SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
  643. eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
  644. eventResult.statusCode = -1;
  645. eventResult.msg = "命令发送失败";
  646. errorEvent.response(eventResult);
  647. }
  648. }
  649. @Override
  650. public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
  651. Device device = storager.queryVideoDevice(deviceId);
  652. if (device == null) {
  653. return;
  654. }
  655. MediaServerItem newMediaServerItem = getNewMediaServerItemHasAssist(device);
  656. if (newMediaServerItem == null) {
  657. callback.run(InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getCode(),
  658. InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getMsg(),
  659. null);
  660. return;
  661. }
  662. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
  663. download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, callback);
  664. }
  665. @Override
  666. public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
  667. if (mediaServerItem == null || ssrcInfo == null) {
  668. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  669. InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
  670. null);
  671. return;
  672. }
  673. Device device = storager.queryVideoDevice(deviceId);
  674. if (device == null) {
  675. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  676. "设备:" + deviceId + "不存在",
  677. null);
  678. return;
  679. }
  680. logger.info("[录像下载] deviceId: {}, channelId: {}, 下载速度:{}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, downloadSpeed, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  681. // 初始化redis中的invite消息状态
  682. InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
  683. mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.DOWNLOAD,
  684. InviteSessionStatus.ready);
  685. inviteStreamService.updateInviteInfo(inviteInfo);
  686. String downLoadTimeOutTaskKey = UUID.randomUUID().toString();
  687. dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> {
  688. logger.warn(String.format("录像下载请求超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  689. inviteStreamService.removeInviteInfo(inviteInfo);
  690. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
  691. InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
  692. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  693. try {
  694. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
  695. } catch (InvalidArgumentException | ParseException | SipException e) {
  696. logger.error("[录像流]录像下载请求超时, 发送BYE失败 {}", e.getMessage());
  697. } catch (SsrcTransactionNotFoundException e) {
  698. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  699. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  700. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  701. }
  702. }, userSetting.getPlayTimeout());
  703. SipSubscribe.Event errorEvent = event -> {
  704. dynamicTask.stop(downLoadTimeOutTaskKey);
  705. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
  706. String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  707. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  708. inviteStreamService.removeInviteInfo(inviteInfo);
  709. };
  710. ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
  711. logger.info("[录像下载]收到订阅消息: " + hookParam);
  712. dynamicTask.stop(downLoadTimeOutTaskKey);
  713. StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
  714. if (streamInfo == null) {
  715. logger.warn("[录像下载] 获取流地址信息失败");
  716. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  717. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  718. return;
  719. }
  720. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  721. logger.info("[录像下载] 调用成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
  722. };
  723. try {
  724. cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed,
  725. hookEvent, errorEvent, eventResult ->{
  726. inviteInfo.setStatus(InviteSessionStatus.ok);
  727. ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
  728. String contentString = new String(responseEvent.getResponse().getRawContent());
  729. // 获取ssrc
  730. int ssrcIndex = contentString.indexOf("y=");
  731. // 检查是否有y字段
  732. if (ssrcIndex >= 0) {
  733. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  734. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
  735. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  736. if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
  737. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  738. tcpActiveHandler(device, channelId, contentString, mediaServerItem, downLoadTimeOutTaskKey, ssrcInfo, callback);
  739. }
  740. return;
  741. }
  742. logger.info("[录像下载] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
  743. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  744. logger.info("[录像下载] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
  745. // 释放ssrc
  746. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  747. // 单端口模式streamId也有变化,需要重新设置监听
  748. if (!mediaServerItem.isRtpEnable()) {
  749. // 添加订阅
  750. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  751. subscribe.removeSubscribe(hookSubscribe);
  752. String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
  753. hookSubscribe.getContent().put("stream", stream);
  754. inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
  755. subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
  756. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam);
  757. dynamicTask.stop(downLoadTimeOutTaskKey);
  758. hookEvent.response(mediaServerItemInUse, hookParam);
  759. });
  760. }
  761. // 更新ssrc
  762. Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
  763. if (!result) {
  764. try {
  765. logger.warn("[录像下载] 更新ssrc失败,停止录像回放 {}/{}", device.getDeviceId(), channelId);
  766. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
  767. } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
  768. logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
  769. }
  770. dynamicTask.stop(downLoadTimeOutTaskKey);
  771. // 释放ssrc
  772. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  773. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  774. callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  775. "下级自定义了ssrc,重新设置收流信息失败", null);
  776. }else {
  777. if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) {
  778. inviteStreamService.removeInviteInfo(inviteInfo);
  779. }
  780. ssrcInfo.setSsrc(ssrcInResponse);
  781. inviteInfo.setSsrcInfo(ssrcInfo);
  782. inviteInfo.setStream(ssrcInfo.getStream());
  783. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  784. tcpActiveHandler(device, channelId, contentString, mediaServerItem, downLoadTimeOutTaskKey, ssrcInfo, callback);
  785. }
  786. }
  787. }else {
  788. logger.info("[录像下载] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
  789. }
  790. }
  791. inviteStreamService.updateInviteInfo(inviteInfo);
  792. });
  793. } catch (InvalidArgumentException | SipException | ParseException e) {
  794. logger.error("[命令发送失败] 录像下载: {}", e.getMessage());
  795. SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
  796. eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
  797. eventResult.statusCode = -1;
  798. eventResult.msg = "命令发送失败";
  799. errorEvent.response(eventResult);
  800. }
  801. }
  802. @Override
  803. public StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream) {
  804. InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream);
  805. if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
  806. if (inviteInfo.getStreamInfo().getProgress() == 1) {
  807. return inviteInfo.getStreamInfo();
  808. }
  809. // 获取当前已下载时长
  810. String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId();
  811. MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  812. if (mediaServerItem == null) {
  813. logger.warn("查询录像信息时发现节点已离线");
  814. return null;
  815. }
  816. if (mediaServerItem.getRecordAssistPort() > 0) {
  817. JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream(), null);
  818. if (jsonObject == null) {
  819. throw new ControllerException(ErrorCode.ERROR100.getCode(), "连接Assist服务失败");
  820. }
  821. if (jsonObject.getInteger("code") == 0) {
  822. long duration = jsonObject.getLong("data");
  823. if (duration == 0) {
  824. inviteInfo.getStreamInfo().setProgress(0);
  825. } else {
  826. String startTime = inviteInfo.getStreamInfo().getStartTime();
  827. String endTime = inviteInfo.getStreamInfo().getEndTime();
  828. long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
  829. long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
  830. BigDecimal currentCount = new BigDecimal(duration / 1000);
  831. BigDecimal totalCount = new BigDecimal(end - start);
  832. BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
  833. double process = divide.doubleValue();
  834. inviteInfo.getStreamInfo().setProgress(process);
  835. }
  836. inviteStreamService.updateInviteInfo(inviteInfo);
  837. }
  838. }
  839. return inviteInfo.getStreamInfo();
  840. }
  841. return null;
  842. }
  843. private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, HookParam hookParam, String deviceId, String channelId, String startTime, String endTime) {
  844. OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam;
  845. StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, streamChangedHookParam, deviceId, channelId);
  846. if (streamInfo != null) {
  847. streamInfo.setProgress(0);
  848. streamInfo.setStartTime(startTime);
  849. streamInfo.setEndTime(endTime);
  850. InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, streamInfo.getStream());
  851. if (inviteInfo != null) {
  852. logger.info("[录像下载] 更新invite消息中的stream信息");
  853. inviteInfo.setStatus(InviteSessionStatus.ok);
  854. inviteInfo.setStreamInfo(streamInfo);
  855. inviteStreamService.updateInviteInfo(inviteInfo);
  856. }
  857. }
  858. return streamInfo;
  859. }
  860. public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) {
  861. StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), hookParam.getTracks(), null);
  862. streamInfo.setDeviceID(deviceId);
  863. streamInfo.setChannelId(channelId);
  864. return streamInfo;
  865. }
  866. @Override
  867. public void zlmServerOffline(String mediaServerId) {
  868. // 处理正在向上推流的上级平台
  869. List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
  870. if (sendRtpItems.size() > 0) {
  871. for (SendRtpItem sendRtpItem : sendRtpItems) {
  872. if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
  873. ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
  874. try {
  875. sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
  876. } catch (SipException | InvalidArgumentException | ParseException e) {
  877. logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
  878. }
  879. }
  880. }
  881. }
  882. // 处理正在观看的国标设备
  883. List<SsrcTransaction> allSsrc = streamSession.getAllSsrc();
  884. if (allSsrc.size() > 0) {
  885. for (SsrcTransaction ssrcTransaction : allSsrc) {
  886. if (ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
  887. Device device = deviceService.getDevice(ssrcTransaction.getDeviceId());
  888. if (device == null) {
  889. continue;
  890. }
  891. try {
  892. cmder.streamByeCmd(device, ssrcTransaction.getChannelId(),
  893. ssrcTransaction.getStream(), null);
  894. } catch (InvalidArgumentException | ParseException | SipException |
  895. SsrcTransactionNotFoundException e) {
  896. logger.error("[zlm离线]为正在使用此zlm的设备, 发送BYE失败 {}", e.getMessage());
  897. }
  898. }
  899. }
  900. }
  901. }
  902. @Override
  903. public void zlmServerOnline(String mediaServerId) {
  904. // TODO 查找之前的点播,流如果不存在则给下级发送bye
  905. // MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  906. // zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
  907. // Integer code = mediaList.getInteger("code");
  908. // if (code == 0) {
  909. // JSONArray data = mediaList.getJSONArray("data");
  910. // if (data == null || data.size() == 0) {
  911. // zlmServerOffline(mediaServerId);
  912. // }else {
  913. // Map<String, JSONObject> mediaListMap = new HashMap<>();
  914. // for (int i = 0; i < data.size(); i++) {
  915. // JSONObject json = data.getJSONObject(i);
  916. // String app = json.getString("app");
  917. // if ("rtp".equals(app)) {
  918. // String stream = json.getString("stream");
  919. // if (mediaListMap.get(stream) != null) {
  920. // continue;
  921. // }
  922. // mediaListMap.put(stream, json);
  923. // // 处理正在观看的国标设备
  924. // List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(null, null, null, stream);
  925. // if (ssrcTransactions.size() > 0) {
  926. // for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
  927. // if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
  928. // cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(),
  929. // ssrcTransaction.getStream(), null);
  930. // }
  931. // }
  932. // }
  933. // }
  934. // }
  935. // if (mediaListMap.size() > 0 ) {
  936. // // 处理正在向上推流的上级平台
  937. // List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
  938. // if (sendRtpItems.size() > 0) {
  939. // for (SendRtpItem sendRtpItem : sendRtpItems) {
  940. // if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
  941. // if (mediaListMap.get(sendRtpItem.getStreamId()) == null) {
  942. // ParentPlatform platform = storager.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
  943. // sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
  944. // }
  945. // }
  946. // }
  947. // }
  948. // }
  949. // }
  950. // }
  951. // }));
  952. }
  953. @Override
  954. public void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {
  955. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
  956. if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
  957. logger.warn("streamId不存在!");
  958. throw new ServiceException("streamId不存在");
  959. }
  960. inviteInfo.getStreamInfo().setPause(true);
  961. inviteStreamService.updateInviteInfo(inviteInfo);
  962. MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
  963. if (null == mediaServerItem) {
  964. logger.warn("mediaServer 不存在!");
  965. throw new ServiceException("mediaServer不存在");
  966. }
  967. // zlm 暂停RTP超时检查
  968. JSONObject jsonObject = zlmresTfulUtils.pauseRtpCheck(mediaServerItem, streamId);
  969. if (jsonObject == null || jsonObject.getInteger("code") != 0) {
  970. throw new ServiceException("暂停RTP接收失败");
  971. }
  972. Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
  973. cmder.playPauseCmd(device, inviteInfo.getStreamInfo());
  974. }
  975. @Override
  976. public void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {
  977. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
  978. if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
  979. logger.warn("streamId不存在!");
  980. throw new ServiceException("streamId不存在");
  981. }
  982. inviteInfo.getStreamInfo().setPause(false);
  983. inviteStreamService.updateInviteInfo(inviteInfo);
  984. MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
  985. if (null == mediaServerItem) {
  986. logger.warn("mediaServer 不存在!");
  987. throw new ServiceException("mediaServer不存在");
  988. }
  989. // zlm 暂停RTP超时检查
  990. JSONObject jsonObject = zlmresTfulUtils.resumeRtpCheck(mediaServerItem, streamId);
  991. if (jsonObject == null || jsonObject.getInteger("code") != 0) {
  992. throw new ServiceException("继续RTP接收失败");
  993. }
  994. Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
  995. cmder.playResumeCmd(device, inviteInfo.getStreamInfo());
  996. }
  997. @Override
  998. public void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback) {
  999. Device device = deviceService.getDevice(deviceId);
  1000. if (device == null) {
  1001. errorCallback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), null);
  1002. return;
  1003. }
  1004. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  1005. if (inviteInfo != null) {
  1006. if (inviteInfo.getStreamInfo() != null) {
  1007. // 已存在线直接截图
  1008. MediaServerItem mediaServerItemInuse = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
  1009. String streamUrl;
  1010. if (mediaServerItemInuse.getRtspPort() != 0) {
  1011. streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", inviteInfo.getStreamInfo().getStream());
  1012. }else {
  1013. streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServerItemInuse.getHttpPort(), "rtp", inviteInfo.getStreamInfo().getStream());
  1014. }
  1015. String path = "snap";
  1016. // 请求截图
  1017. logger.info("[请求截图]: " + fileName);
  1018. zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
  1019. File snapFile = new File(path + File.separator + fileName);
  1020. if (snapFile.exists()) {
  1021. errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), snapFile.getAbsoluteFile());
  1022. }else {
  1023. errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
  1024. }
  1025. return;
  1026. }
  1027. }
  1028. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  1029. play(newMediaServerItem, deviceId, channelId, null, (code, msg, data)->{
  1030. if (code == InviteErrorCode.SUCCESS.getCode()) {
  1031. InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  1032. if (inviteInfoForPlay != null && inviteInfoForPlay.getStreamInfo() != null) {
  1033. getSnap(deviceId, channelId, fileName, errorCallback);
  1034. }else {
  1035. errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
  1036. }
  1037. }else {
  1038. errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
  1039. }
  1040. });
  1041. }
  1042. }