MediaServiceImpl.java 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.genersoft.iot.vmp.common.InviteInfo;
  3. import com.genersoft.iot.vmp.common.InviteSessionType;
  4. import com.genersoft.iot.vmp.common.VideoManagerConstants;
  5. import com.genersoft.iot.vmp.conf.UserSetting;
  6. import com.genersoft.iot.vmp.conf.exception.ControllerException;
  7. import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
  8. import com.genersoft.iot.vmp.gb28181.bean.*;
  9. import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  10. import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
  11. import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
  12. import com.genersoft.iot.vmp.media.bean.MediaServer;
  13. import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
  14. import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
  15. import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
  16. import com.genersoft.iot.vmp.service.*;
  17. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  18. import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  19. import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
  20. import com.genersoft.iot.vmp.utils.DateUtil;
  21. import com.genersoft.iot.vmp.utils.MediaServerUtils;
  22. import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
  23. import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo;
  24. import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
  25. import org.slf4j.Logger;
  26. import org.slf4j.LoggerFactory;
  27. import org.springframework.beans.factory.annotation.Autowired;
  28. import org.springframework.data.redis.core.RedisTemplate;
  29. import org.springframework.stereotype.Service;
  30. import javax.sip.InvalidArgumentException;
  31. import javax.sip.SipException;
  32. import java.text.ParseException;
  33. import java.util.List;
  34. import java.util.Map;
  35. @Service
  36. public class MediaServiceImpl implements IMediaService {
  37. private final static Logger logger = LoggerFactory.getLogger(MediaServiceImpl.class);
  38. @Autowired
  39. private IRedisCatchStorage redisCatchStorage;
  40. @Autowired
  41. private IStreamProxyService streamProxyService;
  42. @Autowired
  43. private UserSetting userSetting;
  44. @Autowired
  45. private RedisTemplate<Object, Object> redisTemplate;
  46. @Autowired
  47. private IUserService userService;
  48. @Autowired
  49. private IInviteStreamService inviteStreamService;
  50. @Autowired
  51. private VideoStreamSessionManager sessionManager;
  52. @Autowired
  53. private IVideoManagerStorage storager;
  54. @Autowired
  55. private IDeviceService deviceService;
  56. @Autowired
  57. private ISIPCommanderForPlatform commanderForPlatform;
  58. @Autowired
  59. private ISIPCommander commander;
  60. @Override
  61. public boolean authenticatePlay(String app, String stream, String callId) {
  62. if (app == null || stream == null) {
  63. return false;
  64. }
  65. if ("rtp".equals(app)) {
  66. return true;
  67. }
  68. StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
  69. if (streamAuthorityInfo == null || streamAuthorityInfo.getCallId() == null) {
  70. return true;
  71. }
  72. return streamAuthorityInfo.getCallId().equals(callId);
  73. }
  74. @Override
  75. public ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params) {
  76. // 推流鉴权的处理
  77. if (!"rtp".equals(app)) {
  78. StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream);
  79. if (streamProxyItem != null) {
  80. ResultForOnPublish result = new ResultForOnPublish();
  81. result.setEnable_audio(streamProxyItem.isEnableAudio());
  82. result.setEnable_mp4(streamProxyItem.isEnableMp4());
  83. return result;
  84. }
  85. if (userSetting.getPushAuthority()) {
  86. // 对于推流进行鉴权
  87. Map<String, String> paramMap = MediaServerUtils.urlParamToMap(params);
  88. // 推流鉴权
  89. if (params == null) {
  90. logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)");
  91. throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
  92. }
  93. String sign = paramMap.get("sign");
  94. if (sign == null) {
  95. logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)");
  96. throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
  97. }
  98. // 推流自定义播放鉴权码
  99. String callId = paramMap.get("callId");
  100. // 鉴权配置
  101. boolean hasAuthority = userService.checkPushAuthority(callId, sign);
  102. if (!hasAuthority) {
  103. logger.info("推流鉴权失败: sign 无权限: callId={}. sign={}", callId, sign);
  104. throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
  105. }
  106. StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(app, stream, mediaServer.getId());
  107. streamAuthorityInfo.setCallId(callId);
  108. streamAuthorityInfo.setSign(sign);
  109. // 鉴权通过
  110. redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo);
  111. }
  112. }
  113. ResultForOnPublish result = new ResultForOnPublish();
  114. result.setEnable_audio(true);
  115. // 是否录像
  116. if ("rtp".equals(app)) {
  117. result.setEnable_mp4(userSetting.getRecordSip());
  118. } else {
  119. result.setEnable_mp4(userSetting.isRecordPushLive());
  120. }
  121. // 国标流
  122. if ("rtp".equals(app)) {
  123. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream);
  124. // 单端口模式下修改流 ID
  125. if (!mediaServer.isRtpEnable() && inviteInfo == null) {
  126. String ssrc = String.format("%010d", Long.parseLong(stream, 16));
  127. inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc);
  128. if (inviteInfo != null) {
  129. result.setStream_replace(inviteInfo.getStream());
  130. logger.info("[ZLM HOOK]推流鉴权 stream: {} 替换为 {}", stream, inviteInfo.getStream());
  131. stream = inviteInfo.getStream();
  132. }
  133. }
  134. // 设置音频信息及录制信息
  135. List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, stream);
  136. if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) {
  137. // 为录制国标模拟一个鉴权信息, 方便后续写入录像文件时使用
  138. StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(app, stream, mediaServer.getId());
  139. streamAuthorityInfo.setApp(app);
  140. streamAuthorityInfo.setStream(ssrcTransactionForAll.get(0).getStream());
  141. streamAuthorityInfo.setCallId(ssrcTransactionForAll.get(0).getSipTransactionInfo().getCallId());
  142. redisCatchStorage.updateStreamAuthorityInfo(app, ssrcTransactionForAll.get(0).getStream(), streamAuthorityInfo);
  143. String deviceId = ssrcTransactionForAll.get(0).getDeviceId();
  144. String channelId = ssrcTransactionForAll.get(0).getChannelId();
  145. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  146. if (deviceChannel != null) {
  147. result.setEnable_audio(deviceChannel.getHasAudio());
  148. }
  149. // 如果是录像下载就设置视频间隔十秒
  150. if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) {
  151. // 获取录像的总时长,然后设置为这个视频的时长
  152. InviteInfo inviteInfoForDownload = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream);
  153. if (inviteInfoForDownload != null && inviteInfoForDownload.getStreamInfo() != null) {
  154. String startTime = inviteInfoForDownload.getStreamInfo().getStartTime();
  155. String endTime = inviteInfoForDownload.getStreamInfo().getEndTime();
  156. long difference = DateUtil.getDifference(startTime, endTime) / 1000;
  157. result.setMp4_max_second((int) difference);
  158. result.setEnable_mp4(true);
  159. // 设置为2保证得到的mp4的时长是正常的
  160. result.setModify_stamp(2);
  161. }
  162. }
  163. // 如果是talk对讲,则默认获取声音
  164. if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.TALK) {
  165. result.setEnable_audio(true);
  166. }
  167. }
  168. } else if (app.equals("broadcast")) {
  169. result.setEnable_audio(true);
  170. } else if (app.equals("talk")) {
  171. result.setEnable_audio(true);
  172. }
  173. if (app.equalsIgnoreCase("rtp")) {
  174. String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream;
  175. OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo) redisTemplate.opsForValue().get(receiveKey);
  176. String receiveKeyForPS = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + stream;
  177. OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo) redisTemplate.opsForValue().get(receiveKeyForPS);
  178. if (otherRtpSendInfo != null || otherPsSendInfo != null) {
  179. result.setEnable_mp4(true);
  180. }
  181. }
  182. return result;
  183. }
  184. @Override
  185. public boolean closeStreamOnNoneReader(String mediaServerId, String app, String stream, String schema) {
  186. boolean result = false;
  187. // 国标类型的流
  188. if ("rtp".equals(app)) {
  189. result = userSetting.getStreamOnDemand();
  190. // 国标流, 点播/录像回放/录像下载
  191. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream);
  192. // 点播
  193. if (inviteInfo != null) {
  194. // 录像下载
  195. if (inviteInfo.getType() == InviteSessionType.DOWNLOAD) {
  196. return false;
  197. }
  198. // 收到无人观看说明流也没有在往上级推送
  199. if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
  200. List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
  201. inviteInfo.getChannelId());
  202. if (!sendRtpItems.isEmpty()) {
  203. for (SendRtpItem sendRtpItem : sendRtpItems) {
  204. ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
  205. try {
  206. commanderForPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
  207. } catch (SipException | InvalidArgumentException | ParseException e) {
  208. logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
  209. }
  210. redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
  211. sendRtpItem.getCallId(), sendRtpItem.getStream());
  212. if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
  213. redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem,parentPlatform);
  214. }
  215. }
  216. }
  217. }
  218. Device device = deviceService.getDevice(inviteInfo.getDeviceId());
  219. if (device != null) {
  220. try {
  221. // 多查询一次防止已经被处理了
  222. InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(),
  223. inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  224. if (info != null) {
  225. commander.streamByeCmd(device, inviteInfo.getChannelId(),
  226. inviteInfo.getStream(), null);
  227. } else {
  228. logger.info("[无人观看] 未找到设备的点播信息: {}, 流:{}", inviteInfo.getDeviceId(), stream);
  229. }
  230. } catch (InvalidArgumentException | ParseException | SipException |
  231. SsrcTransactionNotFoundException e) {
  232. logger.error("[无人观看]点播, 发送BYE失败 {}", e.getMessage());
  233. }
  234. } else {
  235. logger.info("[无人观看] 未找到设备: {},流:{}", inviteInfo.getDeviceId(), stream);
  236. }
  237. inviteStreamService.removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
  238. inviteInfo.getChannelId(), inviteInfo.getStream());
  239. storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
  240. return result;
  241. }
  242. SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, stream, null);
  243. if (sendRtpItem != null && "talk".equals(sendRtpItem.getApp())) {
  244. return false;
  245. }
  246. } else if ("talk".equals(app) || "broadcast".equals(app)) {
  247. return false;
  248. } else {
  249. // 非国标流 推流/拉流代理
  250. // 拉流代理
  251. StreamProxy streamProxy = streamProxyService.getStreamProxyByAppAndStream(app, stream);
  252. if (streamProxy != null) {
  253. if (streamProxy.isEnableRemoveNoneReader()) {
  254. // 无人观看自动移除
  255. result = true;
  256. streamProxyService.del(app, stream);
  257. logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", app, stream, streamProxy.getSrcUrl());
  258. } else if (streamProxy.isEnableDisableNoneReader()) {
  259. // 无人观看停用
  260. result = true;
  261. // 修改数据
  262. streamProxyService.stop(app, stream);
  263. } else {
  264. // 无人观看不做处理
  265. result = false;
  266. }
  267. }
  268. }
  269. return result;
  270. }
  271. }