| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301 |
- package com.genersoft.iot.vmp.service.impl;
- import com.genersoft.iot.vmp.common.InviteInfo;
- import com.genersoft.iot.vmp.common.InviteSessionType;
- import com.genersoft.iot.vmp.common.VideoManagerConstants;
- import com.genersoft.iot.vmp.conf.UserSetting;
- import com.genersoft.iot.vmp.conf.exception.ControllerException;
- import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
- import com.genersoft.iot.vmp.gb28181.bean.*;
- import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
- import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
- import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
- import com.genersoft.iot.vmp.media.bean.MediaServer;
- import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
- import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
- import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
- import com.genersoft.iot.vmp.service.*;
- import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
- import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
- import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
- import com.genersoft.iot.vmp.utils.DateUtil;
- import com.genersoft.iot.vmp.utils.MediaServerUtils;
- import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
- import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo;
- import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.stereotype.Service;
- import javax.sip.InvalidArgumentException;
- import javax.sip.SipException;
- import java.text.ParseException;
- import java.util.List;
- import java.util.Map;
- @Service
- public class MediaServiceImpl implements IMediaService {
- private final static Logger logger = LoggerFactory.getLogger(MediaServiceImpl.class);
- @Autowired
- private IRedisCatchStorage redisCatchStorage;
- @Autowired
- private IStreamProxyService streamProxyService;
- @Autowired
- private UserSetting userSetting;
- @Autowired
- private RedisTemplate<Object, Object> redisTemplate;
- @Autowired
- private IUserService userService;
- @Autowired
- private IInviteStreamService inviteStreamService;
- @Autowired
- private VideoStreamSessionManager sessionManager;
- @Autowired
- private IVideoManagerStorage storager;
- @Autowired
- private IDeviceService deviceService;
- @Autowired
- private ISIPCommanderForPlatform commanderForPlatform;
- @Autowired
- private ISIPCommander commander;
- @Override
- public boolean authenticatePlay(String app, String stream, String callId) {
- if (app == null || stream == null) {
- return false;
- }
- if ("rtp".equals(app)) {
- return true;
- }
- StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
- if (streamAuthorityInfo == null || streamAuthorityInfo.getCallId() == null) {
- return true;
- }
- return streamAuthorityInfo.getCallId().equals(callId);
- }
- @Override
- public ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params) {
- // 推流鉴权的处理
- if (!"rtp".equals(app)) {
- StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream);
- if (streamProxyItem != null) {
- ResultForOnPublish result = new ResultForOnPublish();
- result.setEnable_audio(streamProxyItem.isEnableAudio());
- result.setEnable_mp4(streamProxyItem.isEnableMp4());
- return result;
- }
- if (userSetting.getPushAuthority()) {
- // 对于推流进行鉴权
- Map<String, String> paramMap = MediaServerUtils.urlParamToMap(params);
- // 推流鉴权
- if (params == null) {
- logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)");
- throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
- }
- String sign = paramMap.get("sign");
- if (sign == null) {
- logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)");
- throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
- }
- // 推流自定义播放鉴权码
- String callId = paramMap.get("callId");
- // 鉴权配置
- boolean hasAuthority = userService.checkPushAuthority(callId, sign);
- if (!hasAuthority) {
- logger.info("推流鉴权失败: sign 无权限: callId={}. sign={}", callId, sign);
- throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
- }
- StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(app, stream, mediaServer.getId());
- streamAuthorityInfo.setCallId(callId);
- streamAuthorityInfo.setSign(sign);
- // 鉴权通过
- redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo);
- }
- }
- ResultForOnPublish result = new ResultForOnPublish();
- result.setEnable_audio(true);
- // 是否录像
- if ("rtp".equals(app)) {
- result.setEnable_mp4(userSetting.getRecordSip());
- } else {
- result.setEnable_mp4(userSetting.isRecordPushLive());
- }
- // 国标流
- if ("rtp".equals(app)) {
- InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream);
- // 单端口模式下修改流 ID
- if (!mediaServer.isRtpEnable() && inviteInfo == null) {
- String ssrc = String.format("%010d", Long.parseLong(stream, 16));
- inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc);
- if (inviteInfo != null) {
- result.setStream_replace(inviteInfo.getStream());
- logger.info("[ZLM HOOK]推流鉴权 stream: {} 替换为 {}", stream, inviteInfo.getStream());
- stream = inviteInfo.getStream();
- }
- }
- // 设置音频信息及录制信息
- List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, stream);
- if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) {
- // 为录制国标模拟一个鉴权信息, 方便后续写入录像文件时使用
- StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(app, stream, mediaServer.getId());
- streamAuthorityInfo.setApp(app);
- streamAuthorityInfo.setStream(ssrcTransactionForAll.get(0).getStream());
- streamAuthorityInfo.setCallId(ssrcTransactionForAll.get(0).getSipTransactionInfo().getCallId());
- redisCatchStorage.updateStreamAuthorityInfo(app, ssrcTransactionForAll.get(0).getStream(), streamAuthorityInfo);
- String deviceId = ssrcTransactionForAll.get(0).getDeviceId();
- String channelId = ssrcTransactionForAll.get(0).getChannelId();
- DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
- if (deviceChannel != null) {
- result.setEnable_audio(deviceChannel.getHasAudio());
- }
- // 如果是录像下载就设置视频间隔十秒
- if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) {
- // 获取录像的总时长,然后设置为这个视频的时长
- InviteInfo inviteInfoForDownload = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream);
- if (inviteInfoForDownload != null && inviteInfoForDownload.getStreamInfo() != null) {
- String startTime = inviteInfoForDownload.getStreamInfo().getStartTime();
- String endTime = inviteInfoForDownload.getStreamInfo().getEndTime();
- long difference = DateUtil.getDifference(startTime, endTime) / 1000;
- result.setMp4_max_second((int) difference);
- result.setEnable_mp4(true);
- // 设置为2保证得到的mp4的时长是正常的
- result.setModify_stamp(2);
- }
- }
- // 如果是talk对讲,则默认获取声音
- if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.TALK) {
- result.setEnable_audio(true);
- }
- }
- } else if (app.equals("broadcast")) {
- result.setEnable_audio(true);
- } else if (app.equals("talk")) {
- result.setEnable_audio(true);
- }
- if (app.equalsIgnoreCase("rtp")) {
- String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream;
- OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo) redisTemplate.opsForValue().get(receiveKey);
- String receiveKeyForPS = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + stream;
- OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo) redisTemplate.opsForValue().get(receiveKeyForPS);
- if (otherRtpSendInfo != null || otherPsSendInfo != null) {
- result.setEnable_mp4(true);
- }
- }
- return result;
- }
- @Override
- public boolean closeStreamOnNoneReader(String mediaServerId, String app, String stream, String schema) {
- boolean result = false;
- // 国标类型的流
- if ("rtp".equals(app)) {
- result = userSetting.getStreamOnDemand();
- // 国标流, 点播/录像回放/录像下载
- InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream);
- // 点播
- if (inviteInfo != null) {
- // 录像下载
- if (inviteInfo.getType() == InviteSessionType.DOWNLOAD) {
- return false;
- }
- // 收到无人观看说明流也没有在往上级推送
- if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
- List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
- inviteInfo.getChannelId());
- if (!sendRtpItems.isEmpty()) {
- for (SendRtpItem sendRtpItem : sendRtpItems) {
- ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
- try {
- commanderForPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
- }
- redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
- sendRtpItem.getCallId(), sendRtpItem.getStream());
- if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
- redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem,parentPlatform);
- }
- }
- }
- }
- Device device = deviceService.getDevice(inviteInfo.getDeviceId());
- if (device != null) {
- try {
- // 多查询一次防止已经被处理了
- InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(),
- inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
- if (info != null) {
- commander.streamByeCmd(device, inviteInfo.getChannelId(),
- inviteInfo.getStream(), null);
- } else {
- logger.info("[无人观看] 未找到设备的点播信息: {}, 流:{}", inviteInfo.getDeviceId(), stream);
- }
- } catch (InvalidArgumentException | ParseException | SipException |
- SsrcTransactionNotFoundException e) {
- logger.error("[无人观看]点播, 发送BYE失败 {}", e.getMessage());
- }
- } else {
- logger.info("[无人观看] 未找到设备: {},流:{}", inviteInfo.getDeviceId(), stream);
- }
- inviteStreamService.removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
- inviteInfo.getChannelId(), inviteInfo.getStream());
- storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
- return result;
- }
- SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, stream, null);
- if (sendRtpItem != null && "talk".equals(sendRtpItem.getApp())) {
- return false;
- }
- } else if ("talk".equals(app) || "broadcast".equals(app)) {
- return false;
- } else {
- // 非国标流 推流/拉流代理
- // 拉流代理
- StreamProxy streamProxy = streamProxyService.getStreamProxyByAppAndStream(app, stream);
- if (streamProxy != null) {
- if (streamProxy.isEnableRemoveNoneReader()) {
- // 无人观看自动移除
- result = true;
- streamProxyService.del(app, stream);
- logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", app, stream, streamProxy.getSrcUrl());
- } else if (streamProxy.isEnableDisableNoneReader()) {
- // 无人观看停用
- result = true;
- // 修改数据
- streamProxyService.stop(app, stream);
- } else {
- // 无人观看不做处理
- result = false;
- }
- }
- }
- return result;
- }
- }
|