PlayServiceImpl.java 82 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson2.JSONArray;
  3. import com.alibaba.fastjson2.JSONObject;
  4. import com.genersoft.iot.vmp.common.InviteInfo;
  5. import com.genersoft.iot.vmp.common.InviteSessionStatus;
  6. import com.genersoft.iot.vmp.common.InviteSessionType;
  7. import com.genersoft.iot.vmp.common.StreamInfo;
  8. import com.genersoft.iot.vmp.conf.DynamicTask;
  9. import com.genersoft.iot.vmp.conf.UserSetting;
  10. import com.genersoft.iot.vmp.conf.exception.ControllerException;
  11. import com.genersoft.iot.vmp.conf.exception.ServiceException;
  12. import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
  13. import com.genersoft.iot.vmp.gb28181.bean.*;
  14. import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
  15. import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
  16. import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  17. import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
  18. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
  19. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
  20. import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
  21. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  22. import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
  23. import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
  24. import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
  25. import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
  26. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  27. import com.genersoft.iot.vmp.service.*;
  28. import com.genersoft.iot.vmp.service.bean.ErrorCallback;
  29. import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
  30. import com.genersoft.iot.vmp.service.bean.SSRCInfo;
  31. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  32. import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  33. import com.genersoft.iot.vmp.utils.DateUtil;
  34. import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
  35. import org.slf4j.Logger;
  36. import org.slf4j.LoggerFactory;
  37. import org.springframework.beans.factory.annotation.Autowired;
  38. import org.springframework.data.redis.core.RedisTemplate;
  39. import org.springframework.stereotype.Service;
  40. import org.springframework.util.ObjectUtils;
  41. import javax.sdp.*;
  42. import javax.sip.InvalidArgumentException;
  43. import javax.sip.ResponseEvent;
  44. import javax.sip.SipException;
  45. import java.io.File;
  46. import java.math.BigDecimal;
  47. import java.math.RoundingMode;
  48. import java.text.ParseException;
  49. import java.util.List;
  50. import java.util.UUID;
  51. import java.util.Vector;
  52. @SuppressWarnings(value = {"rawtypes", "unchecked"})
  53. @Service
  54. public class PlayServiceImpl implements IPlayService {
  55. private final static Logger logger = LoggerFactory.getLogger(PlayServiceImpl.class);
  56. @Autowired
  57. private IVideoManagerStorage storager;
  58. @Autowired
  59. private SIPCommander cmder;
  60. @Autowired
  61. private SIPCommanderFroPlatform sipCommanderFroPlatform;
  62. @Autowired
  63. private IRedisCatchStorage redisCatchStorage;
  64. @Autowired
  65. private IInviteStreamService inviteStreamService;
  66. @Autowired
  67. private DeferredResultHolder resultHolder;
  68. @Autowired
  69. private ZLMRESTfulUtils zlmresTfulUtils;
  70. @Autowired
  71. private ZLMRTPServerFactory zlmrtpServerFactory;
  72. @Autowired
  73. private AssistRESTfulUtils assistRESTfulUtils;
  74. @Autowired
  75. private IMediaService mediaService;
  76. @Autowired
  77. private IMediaServerService mediaServerService;
  78. @Autowired
  79. private VideoStreamSessionManager streamSession;
  80. @Autowired
  81. private IDeviceService deviceService;
  82. @Autowired
  83. private UserSetting userSetting;
  84. @Autowired
  85. private DynamicTask dynamicTask;
  86. @Autowired
  87. private ZlmHttpHookSubscribe subscribe;
  88. @Autowired
  89. private SSRCFactory ssrcFactory;
  90. @Autowired
  91. private RedisTemplate<Object, Object> redisTemplate;
  92. @Override
  93. public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId,boolean isSubStream, ErrorCallback<Object> callback) {
  94. if (mediaServerItem == null) {
  95. throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
  96. }
  97. Device device = redisCatchStorage.getDevice(deviceId);
  98. InviteInfo inviteInfo;
  99. if(device.isSwitchPrimarySubStream()){
  100. inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId,isSubStream);
  101. }else {
  102. inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  103. }
  104. if (inviteInfo != null ) {
  105. if (inviteInfo.getStreamInfo() == null) {
  106. // 点播发起了但是尚未成功, 仅注册回调等待结果即可
  107. if(device.isSwitchPrimarySubStream()){
  108. inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId,isSubStream, null, callback);
  109. }else {
  110. inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
  111. }
  112. return inviteInfo.getSsrcInfo();
  113. }else {
  114. StreamInfo streamInfo = inviteInfo.getStreamInfo();
  115. String streamId = streamInfo.getStream();
  116. if (streamId == null) {
  117. callback.run(InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(), "点播失败, redis缓存streamId等于null", null);
  118. if(device.isSwitchPrimarySubStream()){
  119. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
  120. InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(),
  121. "点播失败, redis缓存streamId等于null",
  122. null);
  123. }else {
  124. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  125. InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(),
  126. "点播失败, redis缓存streamId等于null",
  127. null);
  128. }
  129. return inviteInfo.getSsrcInfo();
  130. }
  131. String mediaServerId = streamInfo.getMediaServerId();
  132. MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
  133. Boolean ready = zlmrtpServerFactory.isStreamReady(mediaInfo, "rtp", streamId);
  134. if (ready != null && ready) {
  135. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  136. if(device.isSwitchPrimarySubStream()){
  137. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
  138. InviteErrorCode.SUCCESS.getCode(),
  139. InviteErrorCode.SUCCESS.getMsg(),
  140. streamInfo);
  141. }else {
  142. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  143. InviteErrorCode.SUCCESS.getCode(),
  144. InviteErrorCode.SUCCESS.getMsg(),
  145. streamInfo);
  146. }
  147. return inviteInfo.getSsrcInfo();
  148. }else {
  149. // 点播发起了但是尚未成功, 仅注册回调等待结果即可
  150. if(device.isSwitchPrimarySubStream()) {
  151. inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
  152. storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
  153. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  154. }else {
  155. inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId,isSubStream, null, callback);
  156. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId,isSubStream);
  157. }
  158. }
  159. }
  160. }
  161. String streamId = null;
  162. if (mediaServerItem.isRtpEnable()) {
  163. if(device.isSwitchPrimarySubStream()){
  164. streamId = StreamInfo.getPlayStream(deviceId, channelId, isSubStream);
  165. }else {
  166. streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  167. }
  168. }
  169. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());
  170. if (ssrcInfo == null) {
  171. callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
  172. if(device.isSwitchPrimarySubStream()){
  173. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
  174. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(),
  175. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(),
  176. null);
  177. }else {
  178. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  179. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(),
  180. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(),
  181. null);
  182. }
  183. return null;
  184. }
  185. // TODO 记录点播的状态
  186. play(mediaServerItem, ssrcInfo, device, channelId,isSubStream, callback);
  187. return ssrcInfo;
  188. }
  189. @Override
  190. public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,boolean isSubStream,
  191. ErrorCallback<Object> callback) {
  192. if (mediaServerItem == null || ssrcInfo == null) {
  193. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  194. InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
  195. null);
  196. return;
  197. }
  198. if( device.isSwitchPrimarySubStream() ){
  199. logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId,isSubStream ? "辅码流" : "主码流", ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  200. }else {
  201. logger.info("[点播开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  202. }
  203. //端口获取失败的ssrcInfo 没有必要发送点播指令
  204. if (ssrcInfo.getPort() <= 0) {
  205. logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
  206. // 释放ssrc
  207. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  208. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  209. callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
  210. if(device.isSwitchPrimarySubStream()){
  211. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
  212. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
  213. }else {
  214. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  215. InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
  216. }
  217. return;
  218. }
  219. // 初始化redis中的invite消息状态
  220. InviteInfo inviteInfo;
  221. if(device.isSwitchPrimarySubStream()){
  222. // 初始化redis中的invite消息状态
  223. inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId,isSubStream, ssrcInfo.getStream(), ssrcInfo,
  224. mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
  225. InviteSessionStatus.ready);
  226. inviteStreamService.updateInviteInfoSub(inviteInfo);
  227. }else {
  228. // 初始化redis中的invite消息状态
  229. inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
  230. mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
  231. InviteSessionStatus.ready);
  232. inviteStreamService.updateInviteInfo(inviteInfo);
  233. }
  234. // 超时处理
  235. String timeOutTaskKey = UUID.randomUUID().toString();
  236. dynamicTask.startDelay(timeOutTaskKey, () -> {
  237. // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
  238. InviteInfo inviteInfoForTimeOut;
  239. if(device.isSwitchPrimarySubStream()){
  240. // 初始化redis中的invite消息状态
  241. inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream);
  242. }else {
  243. // 初始化redis中的invite消息状态
  244. inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  245. }
  246. if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) {
  247. if( device.isSwitchPrimarySubStream()){
  248. logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}", device.getDeviceId(), channelId,isSubStream ? "辅码流" : "主码流", ssrcInfo.getPort(), ssrcInfo.getSsrc());
  249. }else {
  250. logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
  251. }
  252. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  253. // InviteInfo inviteInfoForTimeout = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.play, device.getDeviceId(), channelId);
  254. // if (inviteInfoForTimeout == null) {
  255. // return;
  256. // }
  257. // if (InviteSessionStatus.ok == inviteInfoForTimeout.getStatus() ) {
  258. // // TODO 发送bye
  259. // }else {
  260. // // TODO 发送cancel
  261. // }
  262. callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
  263. if( device.isSwitchPrimarySubStream()){
  264. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
  265. InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
  266. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream);
  267. }else {
  268. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  269. InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
  270. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  271. }
  272. try {
  273. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
  274. } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
  275. logger.error("[点播超时], 发送BYE失败 {}", e.getMessage());
  276. } finally {
  277. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  278. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  279. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  280. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  281. // 取消订阅消息监听
  282. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  283. subscribe.removeSubscribe(hookSubscribe);
  284. }
  285. }
  286. }, userSetting.getPlayTimeout());
  287. try {
  288. cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId,isSubStream, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
  289. logger.info("收到订阅消息: " + response.toJSONString());
  290. dynamicTask.stop(timeOutTaskKey);
  291. // hook响应
  292. StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId,isSubStream);
  293. if (streamInfo == null){
  294. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  295. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  296. if( device.isSwitchPrimarySubStream()){
  297. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
  298. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  299. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  300. }else {
  301. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  302. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  303. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  304. }
  305. return;
  306. }
  307. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  308. if( device.isSwitchPrimarySubStream()){
  309. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
  310. InviteErrorCode.SUCCESS.getCode(),
  311. InviteErrorCode.SUCCESS.getMsg(),
  312. streamInfo);
  313. }else {
  314. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  315. InviteErrorCode.SUCCESS.getCode(),
  316. InviteErrorCode.SUCCESS.getMsg(),
  317. streamInfo);
  318. }
  319. if( device.isSwitchPrimarySubStream() ){
  320. logger.info("[点播成功] deviceId: {}, channelId: {},码流类型:{}", device.getDeviceId(), channelId,isSubStream ? "辅码流" : "主码流");
  321. }else {
  322. logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
  323. }
  324. String streamUrl;
  325. if (mediaServerItemInuse.getRtspPort() != 0) {
  326. streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", ssrcInfo.getStream());
  327. }else {
  328. streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServerItemInuse.getHttpPort(), "rtp", ssrcInfo.getStream());
  329. }
  330. String path = "snap";
  331. String fileName = device.getDeviceId() + "_" + channelId + ".jpg";
  332. // 请求截图
  333. logger.info("[请求截图]: " + fileName);
  334. zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
  335. }, (event) -> {
  336. inviteInfo.setStatus(InviteSessionStatus.ok);
  337. ResponseEvent responseEvent = (ResponseEvent) event.event;
  338. String contentString = new String(responseEvent.getResponse().getRawContent());
  339. // 获取ssrc
  340. int ssrcIndex = contentString.indexOf("y=");
  341. // 检查是否有y字段
  342. if (ssrcIndex >= 0) {
  343. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  344. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12).trim();
  345. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  346. if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
  347. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  348. String substring = contentString.substring(0, contentString.indexOf("y="));
  349. try {
  350. SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
  351. int port = -1;
  352. Vector mediaDescriptions = sdp.getMediaDescriptions(true);
  353. for (Object description : mediaDescriptions) {
  354. MediaDescription mediaDescription = (MediaDescription) description;
  355. Media media = mediaDescription.getMedia();
  356. Vector mediaFormats = media.getMediaFormats(false);
  357. if (mediaFormats.contains("96")) {
  358. port = media.getMediaPort();
  359. break;
  360. }
  361. }
  362. logger.info("[点播-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  363. JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
  364. logger.info("[点播-TCP主动连接对方] 结果: {}", jsonObject);
  365. } catch (SdpException e) {
  366. logger.error("[点播-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
  367. dynamicTask.stop(timeOutTaskKey);
  368. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  369. // 释放ssrc
  370. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  371. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  372. callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  373. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  374. if(device.isSwitchPrimarySubStream()){
  375. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
  376. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  377. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  378. }else {
  379. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  380. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  381. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  382. }
  383. }
  384. }
  385. return;
  386. }
  387. logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
  388. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  389. logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
  390. if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) {
  391. // ssrc 不可用
  392. logger.info("[点播消息] SSRC修正时发现ssrc不可使用 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
  393. // 释放ssrc
  394. ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  395. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  396. callback.run(InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(),
  397. InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null);
  398. if( device.isSwitchPrimarySubStream()){
  399. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
  400. InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(),
  401. InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null);
  402. }else {
  403. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  404. InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(),
  405. InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null);
  406. }
  407. return;
  408. }
  409. // 单端口模式streamId也有变化,重新设置监听即可
  410. if (!mediaServerItem.isRtpEnable()) {
  411. // 添加订阅
  412. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  413. subscribe.removeSubscribe(hookSubscribe);
  414. String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
  415. hookSubscribe.getContent().put("stream", stream);
  416. inviteInfo.setStream(stream);
  417. subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
  418. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
  419. dynamicTask.stop(timeOutTaskKey);
  420. // hook响应
  421. StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId,isSubStream);
  422. if (streamInfo == null){
  423. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  424. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  425. if( device.isSwitchPrimarySubStream()){
  426. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
  427. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  428. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  429. }else {
  430. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  431. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  432. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  433. }
  434. return;
  435. }
  436. callback.run(InviteErrorCode.SUCCESS.getCode(),
  437. InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  438. if( device.isSwitchPrimarySubStream()){
  439. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
  440. InviteErrorCode.SUCCESS.getCode(),
  441. InviteErrorCode.SUCCESS.getMsg(),
  442. streamInfo);
  443. }else {
  444. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  445. InviteErrorCode.SUCCESS.getCode(),
  446. InviteErrorCode.SUCCESS.getMsg(),
  447. streamInfo);
  448. }
  449. });
  450. return;
  451. }
  452. // 更新ssrc
  453. Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
  454. if (!result) {
  455. try {
  456. logger.warn("[点播] 更新ssrc失败,停止点播 {}/{}", device.getDeviceId(), channelId);
  457. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
  458. } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
  459. logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
  460. }
  461. dynamicTask.stop(timeOutTaskKey);
  462. // 释放ssrc
  463. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  464. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  465. callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  466. "下级自定义了ssrc,重新设置收流信息失败", null);
  467. if( device.isSwitchPrimarySubStream()){
  468. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
  469. InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  470. "下级自定义了ssrc,重新设置收流信息失败", null);
  471. }else {
  472. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  473. InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  474. "下级自定义了ssrc,重新设置收流信息失败", null);
  475. }
  476. }else {
  477. ssrcInfo.setSsrc(ssrcInResponse);
  478. inviteInfo.setSsrcInfo(ssrcInfo);
  479. inviteInfo.setStream(ssrcInfo.getStream());
  480. }
  481. }else {
  482. logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
  483. }
  484. }
  485. if(device.isSwitchPrimarySubStream()){
  486. inviteStreamService.updateInviteInfoSub(inviteInfo);
  487. }else {
  488. inviteStreamService.updateInviteInfo(inviteInfo);
  489. }
  490. }, (event) -> {
  491. dynamicTask.stop(timeOutTaskKey);
  492. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  493. // 释放ssrc
  494. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  495. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  496. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
  497. String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  498. if( device.isSwitchPrimarySubStream()){
  499. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
  500. InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  501. String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  502. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream);
  503. }else {
  504. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  505. InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  506. String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  507. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  508. }
  509. });
  510. } catch (InvalidArgumentException | SipException | ParseException e) {
  511. logger.error("[命令发送失败] 点播消息: {}", e.getMessage());
  512. dynamicTask.stop(timeOutTaskKey);
  513. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  514. // 释放ssrc
  515. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  516. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  517. callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
  518. InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
  519. if( device.isSwitchPrimarySubStream()){
  520. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
  521. InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
  522. InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
  523. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream);
  524. }else {
  525. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  526. InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
  527. InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
  528. inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  529. }
  530. }
  531. }
  532. private StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId,boolean isSubStream) {
  533. StreamInfo streamInfo = null;
  534. Device device = redisCatchStorage.getDevice(deviceId);
  535. if( device.isSwitchPrimarySubStream() ){
  536. streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId,isSubStream);
  537. }else {
  538. streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
  539. }
  540. if (streamInfo != null) {
  541. InviteInfo inviteInfo;
  542. if(device.isSwitchPrimarySubStream()){
  543. inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId,isSubStream);
  544. }else {
  545. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  546. if (deviceChannel != null) {
  547. deviceChannel.setStreamId(streamInfo.getStream());
  548. storager.startPlay(deviceId, channelId, streamInfo.getStream());
  549. }
  550. inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  551. }
  552. if (inviteInfo != null) {
  553. inviteInfo.setStatus(InviteSessionStatus.ok);
  554. inviteInfo.setStreamInfo(streamInfo);
  555. if(device.isSwitchPrimarySubStream()){
  556. inviteStreamService.updateInviteInfoSub(inviteInfo);
  557. }else {
  558. inviteStreamService.updateInviteInfo(inviteInfo);
  559. }
  560. }
  561. }
  562. return streamInfo;
  563. }
  564. private StreamInfo onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String startTime, String endTime) {
  565. StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
  566. if (streamInfo != null) {
  567. streamInfo.setStartTime(startTime);
  568. streamInfo.setEndTime(endTime);
  569. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  570. if (deviceChannel != null) {
  571. deviceChannel.setStreamId(streamInfo.getStream());
  572. storager.startPlay(deviceId, channelId, streamInfo.getStream());
  573. }
  574. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, deviceId, channelId);
  575. if (inviteInfo != null) {
  576. inviteInfo.setStatus(InviteSessionStatus.ok);
  577. inviteInfo.setStreamInfo(streamInfo);
  578. inviteStreamService.updateInviteInfo(inviteInfo);
  579. }
  580. }
  581. return streamInfo;
  582. }
  583. @Override
  584. public MediaServerItem getNewMediaServerItem(Device device) {
  585. if (device == null) {
  586. return null;
  587. }
  588. MediaServerItem mediaServerItem;
  589. if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
  590. mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
  591. } else {
  592. mediaServerItem = mediaServerService.getOne(device.getMediaServerId());
  593. }
  594. if (mediaServerItem == null) {
  595. logger.warn("点播时未找到可使用的ZLM...");
  596. }
  597. return mediaServerItem;
  598. }
  599. @Override
  600. public MediaServerItem getNewMediaServerItemHasAssist(Device device) {
  601. if (device == null) {
  602. return null;
  603. }
  604. MediaServerItem mediaServerItem;
  605. if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
  606. mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(true);
  607. } else {
  608. mediaServerItem = mediaServerService.getOne(device.getMediaServerId());
  609. }
  610. if (mediaServerItem == null) {
  611. logger.warn("[获取可用的ZLM节点]未找到可使用的ZLM...");
  612. }
  613. return mediaServerItem;
  614. }
  615. @Override
  616. public void playBack(String deviceId, String channelId, String startTime,
  617. String endTime, ErrorCallback<Object> callback) {
  618. Device device = storager.queryVideoDevice(deviceId);
  619. if (device == null) {
  620. return;
  621. }
  622. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  623. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
  624. playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback);
  625. }
  626. @Override
  627. public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
  628. String deviceId, String channelId, String startTime,
  629. String endTime, ErrorCallback<Object> callback) {
  630. if (mediaServerItem == null || ssrcInfo == null) {
  631. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  632. InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
  633. null);
  634. return;
  635. }
  636. Device device = storager.queryVideoDevice(deviceId);
  637. if (device == null) {
  638. throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备: " + deviceId + "不存在");
  639. }
  640. logger.info("[录像回放] deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
  641. device.getDeviceId(), channelId, startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(),
  642. ssrcInfo.getSsrc(), device.isSsrcCheck());
  643. // 初始化redis中的invite消息状态
  644. InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
  645. mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK,
  646. InviteSessionStatus.ready);
  647. inviteStreamService.updateInviteInfo(inviteInfo);
  648. String playBackTimeOutTaskKey = UUID.randomUUID().toString();
  649. dynamicTask.startDelay(playBackTimeOutTaskKey, () -> {
  650. logger.warn("[录像回放] 超时,deviceId:{} ,channelId:{}", deviceId, channelId);
  651. inviteStreamService.removeInviteInfo(inviteInfo);
  652. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
  653. try {
  654. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
  655. } catch (InvalidArgumentException | ParseException | SipException e) {
  656. logger.error("[录像回放] 超时 发送BYE失败 {}", e.getMessage());
  657. } catch (SsrcTransactionNotFoundException e) {
  658. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  659. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  660. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  661. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  662. }
  663. }, userSetting.getPlayTimeout());
  664. SipSubscribe.Event errorEvent = event -> {
  665. logger.info("[录像回放] 失败,{} {}", event.statusCode, event.msg);
  666. dynamicTask.stop(playBackTimeOutTaskKey);
  667. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
  668. String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  669. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  670. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  671. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  672. inviteStreamService.removeInviteInfo(inviteInfo);
  673. };
  674. ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, jsonObject) -> {
  675. logger.info("收到回放订阅消息: " + jsonObject);
  676. dynamicTask.stop(playBackTimeOutTaskKey);
  677. StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, jsonObject, deviceId, channelId, startTime, endTime);
  678. if (streamInfo == null) {
  679. logger.warn("设备回放API调用失败!");
  680. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  681. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  682. return;
  683. }
  684. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  685. logger.info("[录像回放] 成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
  686. };
  687. try {
  688. cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime,
  689. hookEvent, eventResult -> {
  690. inviteInfo.setStatus(InviteSessionStatus.ok);
  691. ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
  692. String contentString = new String(responseEvent.getResponse().getRawContent());
  693. // 获取ssrc
  694. int ssrcIndex = contentString.indexOf("y=");
  695. // 检查是否有y字段
  696. if (ssrcIndex >= 0) {
  697. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  698. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
  699. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  700. if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
  701. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  702. String substring = contentString.substring(0, contentString.indexOf("y="));
  703. try {
  704. SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
  705. int port = -1;
  706. Vector mediaDescriptions = sdp.getMediaDescriptions(true);
  707. for (Object description : mediaDescriptions) {
  708. MediaDescription mediaDescription = (MediaDescription) description;
  709. Media media = mediaDescription.getMedia();
  710. Vector mediaFormats = media.getMediaFormats(false);
  711. if (mediaFormats.contains("96")) {
  712. port = media.getMediaPort();
  713. break;
  714. }
  715. }
  716. logger.info("[录像回放-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  717. JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
  718. logger.info("[录像回放-TCP主动连接对方] 结果: {}", jsonObject);
  719. } catch (SdpException e) {
  720. logger.error("[录像回放-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
  721. dynamicTask.stop(playBackTimeOutTaskKey);
  722. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  723. // 释放ssrc
  724. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  725. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  726. callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  727. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  728. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  729. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  730. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  731. }
  732. }
  733. return;
  734. }
  735. logger.info("[录像回放] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
  736. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  737. logger.info("[录像回放] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
  738. if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) {
  739. // ssrc 不可用
  740. logger.info("[录像回放] SSRC修正时发现ssrc不可使用 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
  741. // 释放ssrc
  742. dynamicTask.stop(playBackTimeOutTaskKey);
  743. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  744. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  745. callback.run(InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(),
  746. InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null);
  747. return;
  748. }
  749. // 单端口模式streamId也有变化,需要重新设置监听
  750. if (!mediaServerItem.isRtpEnable()) {
  751. // 添加订阅
  752. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  753. subscribe.removeSubscribe(hookSubscribe);
  754. String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
  755. hookSubscribe.getContent().put("stream", stream);
  756. inviteInfo.setStream(stream);
  757. subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
  758. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
  759. dynamicTask.stop(playBackTimeOutTaskKey);
  760. // hook响应
  761. hookEvent.response(mediaServerItemInUse, response);
  762. });
  763. }
  764. // 更新ssrc
  765. Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
  766. if (!result) {
  767. try {
  768. logger.warn("[录像回放] 更新ssrc失败,停止录像回放 {}/{}", device.getDeviceId(), channelId);
  769. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
  770. } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
  771. logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
  772. }
  773. dynamicTask.stop(playBackTimeOutTaskKey);
  774. // 释放ssrc
  775. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  776. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  777. callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  778. "下级自定义了ssrc,重新设置收流信息失败", null);
  779. }else {
  780. ssrcInfo.setSsrc(ssrcInResponse);
  781. inviteInfo.setSsrcInfo(ssrcInfo);
  782. inviteInfo.setStream(ssrcInfo.getStream());
  783. }
  784. }else {
  785. logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
  786. }
  787. }
  788. inviteStreamService.updateInviteInfo(inviteInfo);
  789. }, errorEvent);
  790. } catch (InvalidArgumentException | SipException | ParseException e) {
  791. logger.error("[命令发送失败] 回放: {}", e.getMessage());
  792. SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
  793. eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
  794. eventResult.statusCode = -1;
  795. eventResult.msg = "命令发送失败";
  796. errorEvent.response(eventResult);
  797. }
  798. }
  799. @Override
  800. public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
  801. Device device = storager.queryVideoDevice(deviceId);
  802. if (device == null) {
  803. return;
  804. }
  805. MediaServerItem newMediaServerItem = getNewMediaServerItemHasAssist(device);
  806. if (newMediaServerItem == null) {
  807. callback.run(InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getCode(),
  808. InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getMsg(),
  809. null);
  810. return;
  811. }
  812. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
  813. download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, callback);
  814. }
  815. @Override
  816. public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
  817. if (mediaServerItem == null || ssrcInfo == null) {
  818. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  819. InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
  820. null);
  821. return;
  822. }
  823. Device device = storager.queryVideoDevice(deviceId);
  824. if (device == null) {
  825. callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
  826. "设备:" + deviceId + "不存在",
  827. null);
  828. return;
  829. }
  830. logger.info("[录像下载] deviceId: {}, channelId: {}, 下载速度:{}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, downloadSpeed, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  831. // 初始化redis中的invite消息状态
  832. InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
  833. mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.DOWNLOAD,
  834. InviteSessionStatus.ready);
  835. inviteStreamService.updateInviteInfo(inviteInfo);
  836. String downLoadTimeOutTaskKey = UUID.randomUUID().toString();
  837. dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> {
  838. logger.warn(String.format("录像下载请求超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  839. inviteStreamService.removeInviteInfo(inviteInfo);
  840. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
  841. InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
  842. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  843. try {
  844. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
  845. } catch (InvalidArgumentException | ParseException | SipException e) {
  846. logger.error("[录像流]录像下载请求超时, 发送BYE失败 {}", e.getMessage());
  847. } catch (SsrcTransactionNotFoundException e) {
  848. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  849. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  850. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  851. }
  852. }, userSetting.getPlayTimeout());
  853. SipSubscribe.Event errorEvent = event -> {
  854. dynamicTask.stop(downLoadTimeOutTaskKey);
  855. callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
  856. String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  857. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  858. inviteStreamService.removeInviteInfo(inviteInfo);
  859. };
  860. ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, jsonObject) -> {
  861. logger.info("[录像下载]收到订阅消息: " + jsonObject);
  862. dynamicTask.stop(downLoadTimeOutTaskKey);
  863. StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, jsonObject, deviceId, channelId, startTime, endTime);
  864. if (streamInfo == null) {
  865. logger.warn("[录像下载] 获取流地址信息失败");
  866. callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  867. InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  868. return;
  869. }
  870. callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  871. logger.info("[录像下载] 调用成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
  872. };
  873. try {
  874. cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed,
  875. hookEvent, errorEvent, eventResult ->{
  876. inviteInfo.setStatus(InviteSessionStatus.ok);
  877. ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
  878. String contentString = new String(responseEvent.getResponse().getRawContent());
  879. // 获取ssrc
  880. int ssrcIndex = contentString.indexOf("y=");
  881. // 检查是否有y字段
  882. if (ssrcIndex >= 0) {
  883. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  884. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
  885. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  886. if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
  887. if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
  888. String substring = contentString.substring(0, contentString.indexOf("y="));
  889. try {
  890. SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
  891. int port = -1;
  892. Vector mediaDescriptions = sdp.getMediaDescriptions(true);
  893. for (Object description : mediaDescriptions) {
  894. MediaDescription mediaDescription = (MediaDescription) description;
  895. Media media = mediaDescription.getMedia();
  896. Vector mediaFormats = media.getMediaFormats(false);
  897. if (mediaFormats.contains("96")) {
  898. port = media.getMediaPort();
  899. break;
  900. }
  901. }
  902. logger.info("[录像下载-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  903. JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
  904. logger.info("[录像下载-TCP主动连接对方] 结果: {}", jsonObject);
  905. } catch (SdpException e) {
  906. logger.error("[录像下载-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
  907. dynamicTask.stop(downLoadTimeOutTaskKey);
  908. mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  909. // 释放ssrc
  910. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  911. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  912. callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  913. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  914. inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  915. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  916. InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  917. }
  918. }
  919. return;
  920. }
  921. logger.info("[录像下载] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
  922. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  923. logger.info("[录像下载] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
  924. if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) {
  925. // ssrc 不可用
  926. // 释放ssrc
  927. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  928. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  929. callback.run(InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(),
  930. InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null);
  931. return;
  932. }
  933. // 单端口模式streamId也有变化,需要重新设置监听
  934. if (!mediaServerItem.isRtpEnable()) {
  935. // 添加订阅
  936. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
  937. subscribe.removeSubscribe(hookSubscribe);
  938. hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
  939. subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
  940. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
  941. dynamicTask.stop(downLoadTimeOutTaskKey);
  942. hookEvent.response(mediaServerItemInUse, response);
  943. });
  944. }
  945. // 更新ssrc
  946. Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
  947. if (!result) {
  948. try {
  949. logger.warn("[录像下载] 更新ssrc失败,停止录像回放 {}/{}", device.getDeviceId(), channelId);
  950. cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
  951. } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
  952. logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
  953. }
  954. dynamicTask.stop(downLoadTimeOutTaskKey);
  955. // 释放ssrc
  956. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  957. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  958. callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  959. "下级自定义了ssrc,重新设置收流信息失败", null);
  960. }else {
  961. ssrcInfo.setSsrc(ssrcInResponse);
  962. inviteInfo.setSsrcInfo(ssrcInfo);
  963. inviteInfo.setStream(ssrcInfo.getStream());
  964. }
  965. }else {
  966. logger.info("[录像下载] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
  967. }
  968. }
  969. });
  970. } catch (InvalidArgumentException | SipException | ParseException e) {
  971. logger.error("[命令发送失败] 录像下载: {}", e.getMessage());
  972. SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
  973. eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
  974. eventResult.statusCode = -1;
  975. eventResult.msg = "命令发送失败";
  976. errorEvent.response(eventResult);
  977. }
  978. }
  979. @Override
  980. public StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream) {
  981. InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream);
  982. if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
  983. if (inviteInfo.getStreamInfo().getProgress() == 1) {
  984. return inviteInfo.getStreamInfo();
  985. }
  986. // 获取当前已下载时长
  987. String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId();
  988. MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  989. if (mediaServerItem == null) {
  990. logger.warn("查询录像信息时发现节点已离线");
  991. return null;
  992. }
  993. if (mediaServerItem.getRecordAssistPort() > 0) {
  994. JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream(), null);
  995. if (jsonObject == null) {
  996. throw new ControllerException(ErrorCode.ERROR100.getCode(), "连接Assist服务失败");
  997. }
  998. if (jsonObject.getInteger("code") == 0) {
  999. long duration = jsonObject.getLong("data");
  1000. if (duration == 0) {
  1001. inviteInfo.getStreamInfo().setProgress(0);
  1002. } else {
  1003. String startTime = inviteInfo.getStreamInfo().getStartTime();
  1004. String endTime = inviteInfo.getStreamInfo().getEndTime();
  1005. long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
  1006. long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
  1007. BigDecimal currentCount = new BigDecimal(duration / 1000);
  1008. BigDecimal totalCount = new BigDecimal(end - start);
  1009. BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
  1010. double process = divide.doubleValue();
  1011. inviteInfo.getStreamInfo().setProgress(process);
  1012. }
  1013. inviteStreamService.updateInviteInfo(inviteInfo);
  1014. }
  1015. }
  1016. return inviteInfo.getStreamInfo();
  1017. }
  1018. return null;
  1019. }
  1020. private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, JSONObject response, String deviceId, String channelId, String startTime, String endTime) {
  1021. StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, response, deviceId, channelId);
  1022. if (streamInfo != null) {
  1023. streamInfo.setProgress(0);
  1024. streamInfo.setStartTime(startTime);
  1025. streamInfo.setEndTime(endTime);
  1026. InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, streamInfo.getStream());
  1027. if (inviteInfo != null) {
  1028. logger.info("[录像下载] 更新invite消息中的stream信息");
  1029. inviteInfo.setStatus(InviteSessionStatus.ok);
  1030. inviteInfo.setStreamInfo(streamInfo);
  1031. inviteStreamService.updateInviteInfo(inviteInfo);
  1032. }
  1033. }
  1034. return streamInfo;
  1035. }
  1036. public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {
  1037. String streamId = resonse.getString("stream");
  1038. JSONArray tracks = resonse.getJSONArray("tracks");
  1039. StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", streamId, tracks, null);
  1040. streamInfo.setDeviceID(deviceId);
  1041. streamInfo.setChannelId(channelId);
  1042. return streamInfo;
  1043. }
  1044. @Override
  1045. public void zlmServerOffline(String mediaServerId) {
  1046. // 处理正在向上推流的上级平台
  1047. List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
  1048. if (sendRtpItems.size() > 0) {
  1049. for (SendRtpItem sendRtpItem : sendRtpItems) {
  1050. if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
  1051. ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
  1052. try {
  1053. sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
  1054. } catch (SipException | InvalidArgumentException | ParseException e) {
  1055. logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
  1056. }
  1057. }
  1058. }
  1059. }
  1060. // 处理正在观看的国标设备
  1061. List<SsrcTransaction> allSsrc = streamSession.getAllSsrc();
  1062. if (allSsrc.size() > 0) {
  1063. for (SsrcTransaction ssrcTransaction : allSsrc) {
  1064. if (ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
  1065. Device device = deviceService.getDevice(ssrcTransaction.getDeviceId());
  1066. if (device == null) {
  1067. continue;
  1068. }
  1069. try {
  1070. cmder.streamByeCmd(device, ssrcTransaction.getChannelId(),
  1071. ssrcTransaction.getStream(), null);
  1072. } catch (InvalidArgumentException | ParseException | SipException |
  1073. SsrcTransactionNotFoundException e) {
  1074. logger.error("[zlm离线]为正在使用此zlm的设备, 发送BYE失败 {}", e.getMessage());
  1075. }
  1076. }
  1077. }
  1078. }
  1079. }
  1080. @Override
  1081. public void zlmServerOnline(String mediaServerId) {
  1082. // TODO 查找之前的点播,流如果不存在则给下级发送bye
  1083. // MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  1084. // zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
  1085. // Integer code = mediaList.getInteger("code");
  1086. // if (code == 0) {
  1087. // JSONArray data = mediaList.getJSONArray("data");
  1088. // if (data == null || data.size() == 0) {
  1089. // zlmServerOffline(mediaServerId);
  1090. // }else {
  1091. // Map<String, JSONObject> mediaListMap = new HashMap<>();
  1092. // for (int i = 0; i < data.size(); i++) {
  1093. // JSONObject json = data.getJSONObject(i);
  1094. // String app = json.getString("app");
  1095. // if ("rtp".equals(app)) {
  1096. // String stream = json.getString("stream");
  1097. // if (mediaListMap.get(stream) != null) {
  1098. // continue;
  1099. // }
  1100. // mediaListMap.put(stream, json);
  1101. // // 处理正在观看的国标设备
  1102. // List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(null, null, null, stream);
  1103. // if (ssrcTransactions.size() > 0) {
  1104. // for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
  1105. // if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
  1106. // cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(),
  1107. // ssrcTransaction.getStream(), null);
  1108. // }
  1109. // }
  1110. // }
  1111. // }
  1112. // }
  1113. // if (mediaListMap.size() > 0 ) {
  1114. // // 处理正在向上推流的上级平台
  1115. // List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
  1116. // if (sendRtpItems.size() > 0) {
  1117. // for (SendRtpItem sendRtpItem : sendRtpItems) {
  1118. // if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
  1119. // if (mediaListMap.get(sendRtpItem.getStreamId()) == null) {
  1120. // ParentPlatform platform = storager.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
  1121. // sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
  1122. // }
  1123. // }
  1124. // }
  1125. // }
  1126. // }
  1127. // }
  1128. // }
  1129. // }));
  1130. }
  1131. @Override
  1132. public void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {
  1133. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
  1134. if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
  1135. logger.warn("streamId不存在!");
  1136. throw new ServiceException("streamId不存在");
  1137. }
  1138. inviteInfo.getStreamInfo().setPause(true);
  1139. inviteStreamService.updateInviteInfo(inviteInfo);
  1140. MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
  1141. if (null == mediaServerItem) {
  1142. logger.warn("mediaServer 不存在!");
  1143. throw new ServiceException("mediaServer不存在");
  1144. }
  1145. // zlm 暂停RTP超时检查
  1146. JSONObject jsonObject = zlmresTfulUtils.pauseRtpCheck(mediaServerItem, streamId);
  1147. if (jsonObject == null || jsonObject.getInteger("code") != 0) {
  1148. throw new ServiceException("暂停RTP接收失败");
  1149. }
  1150. Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
  1151. cmder.playPauseCmd(device, inviteInfo.getStreamInfo());
  1152. }
  1153. @Override
  1154. public void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {
  1155. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
  1156. if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
  1157. logger.warn("streamId不存在!");
  1158. throw new ServiceException("streamId不存在");
  1159. }
  1160. inviteInfo.getStreamInfo().setPause(false);
  1161. inviteStreamService.updateInviteInfo(inviteInfo);
  1162. MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
  1163. if (null == mediaServerItem) {
  1164. logger.warn("mediaServer 不存在!");
  1165. throw new ServiceException("mediaServer不存在");
  1166. }
  1167. // zlm 暂停RTP超时检查
  1168. JSONObject jsonObject = zlmresTfulUtils.resumeRtpCheck(mediaServerItem, streamId);
  1169. if (jsonObject == null || jsonObject.getInteger("code") != 0) {
  1170. throw new ServiceException("继续RTP接收失败");
  1171. }
  1172. Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
  1173. cmder.playResumeCmd(device, inviteInfo.getStreamInfo());
  1174. }
  1175. @Override
  1176. public void getSnap(String deviceId, String channelId, String fileName,boolean isSubStream, ErrorCallback errorCallback) {
  1177. Device device = deviceService.getDevice(deviceId);
  1178. if (device == null) {
  1179. errorCallback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), null);
  1180. return;
  1181. }
  1182. InviteInfo inviteInfo;
  1183. if(device.isSwitchPrimarySubStream()){
  1184. inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId,isSubStream);
  1185. }else {
  1186. inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  1187. }
  1188. if (inviteInfo != null) {
  1189. if (inviteInfo.getStreamInfo() != null) {
  1190. // 已存在线直接截图
  1191. MediaServerItem mediaServerItemInuse = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
  1192. String streamUrl;
  1193. if (mediaServerItemInuse.getRtspPort() != 0) {
  1194. streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", inviteInfo.getStreamInfo().getStream());
  1195. }else {
  1196. streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServerItemInuse.getHttpPort(), "rtp", inviteInfo.getStreamInfo().getStream());
  1197. }
  1198. String path = "snap";
  1199. // 请求截图
  1200. logger.info("[请求截图]: " + fileName);
  1201. zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
  1202. File snapFile = new File(path + File.separator + fileName);
  1203. if (snapFile.exists()) {
  1204. errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), snapFile.getAbsoluteFile());
  1205. }else {
  1206. errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
  1207. }
  1208. return;
  1209. }
  1210. }
  1211. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  1212. play(newMediaServerItem, deviceId, channelId,isSubStream, (code, msg, data)->{
  1213. if (code == InviteErrorCode.SUCCESS.getCode()) {
  1214. InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  1215. if (inviteInfoForPlay != null && inviteInfoForPlay.getStreamInfo() != null) {
  1216. getSnap(deviceId, channelId, fileName,isSubStream, errorCallback);
  1217. }else {
  1218. errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
  1219. }
  1220. }else {
  1221. errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
  1222. }
  1223. });
  1224. }
  1225. /*======================设备主子码流逻辑START=========================*/
  1226. public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId,boolean isSubStream) {
  1227. String streamId = resonse.getString("stream");
  1228. JSONArray tracks = resonse.getJSONArray("tracks");
  1229. StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", streamId, tracks, null);
  1230. streamInfo.setDeviceID(deviceId);
  1231. streamInfo.setChannelId(channelId);
  1232. streamInfo.setSubStream(isSubStream);
  1233. return streamInfo;
  1234. }
  1235. /*======================设备主子码流逻辑END=========================*/
  1236. }