|  | @@ -1,15 +1,25 @@
 | 
	
		
			
				|  |  |  package com.genersoft.iot.vmp.service.impl;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +import com.alibaba.fastjson2.JSONObject;
 | 
	
		
			
				|  |  | +import com.genersoft.iot.vmp.common.StreamInfo;
 | 
	
		
			
				|  |  |  import com.genersoft.iot.vmp.conf.DynamicTask;
 | 
	
		
			
				|  |  |  import com.genersoft.iot.vmp.conf.UserSetting;
 | 
	
		
			
				|  |  | +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
 | 
	
		
			
				|  |  |  import com.genersoft.iot.vmp.gb28181.bean.*;
 | 
	
		
			
				|  |  |  import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
 | 
	
		
			
				|  |  | +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 | 
	
		
			
				|  |  |  import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
 | 
	
		
			
				|  |  |  import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
 | 
	
		
			
				|  |  | +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 | 
	
		
			
				|  |  | +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
 | 
	
		
			
				|  |  | +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
 | 
	
		
			
				|  |  |  import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 | 
	
		
			
				|  |  |  import com.genersoft.iot.vmp.service.IMediaServerService;
 | 
	
		
			
				|  |  |  import com.genersoft.iot.vmp.service.IPlatformService;
 | 
	
		
			
				|  |  | +import com.genersoft.iot.vmp.service.IPlayService;
 | 
	
		
			
				|  |  |  import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 | 
	
		
			
				|  |  | +import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
 | 
	
		
			
				|  |  | +import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 | 
	
		
			
				|  |  |  import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 | 
	
		
			
				|  |  |  import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
 | 
	
		
			
				|  |  |  import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
 | 
	
	
		
			
				|  | @@ -21,11 +31,13 @@ import org.springframework.beans.factory.annotation.Autowired;
 | 
	
		
			
				|  |  |  import org.springframework.stereotype.Service;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import javax.sip.InvalidArgumentException;
 | 
	
		
			
				|  |  | +import javax.sip.ResponseEvent;
 | 
	
		
			
				|  |  |  import javax.sip.SipException;
 | 
	
		
			
				|  |  |  import java.text.ParseException;
 | 
	
		
			
				|  |  |  import java.util.HashMap;
 | 
	
		
			
				|  |  |  import java.util.List;
 | 
	
		
			
				|  |  |  import java.util.Map;
 | 
	
		
			
				|  |  | +import java.util.UUID;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /**
 | 
	
		
			
				|  |  |   * @author lin
 | 
	
	
		
			
				|  | @@ -65,6 +77,16 @@ public class PlatformServiceImpl implements IPlatformService {
 | 
	
		
			
				|  |  |      @Autowired
 | 
	
		
			
				|  |  |      private UserSetting userSetting;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    @Autowired
 | 
	
		
			
				|  |  | +    private ZlmHttpHookSubscribe subscribe;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @Autowired
 | 
	
		
			
				|  |  | +    private VideoStreamSessionManager streamSession;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @Autowired
 | 
	
		
			
				|  |  | +    private IPlayService playService;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @Override
 | 
	
	
		
			
				|  | @@ -295,4 +317,137 @@ public class PlatformServiceImpl implements IPlatformService {
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @Override
 | 
	
		
			
				|  |  | +    public void broadcastInvite(ParentPlatform platform, String channelId, MediaServerItem mediaServerItem, ZlmHttpHookSubscribe.Event hookEvent,
 | 
	
		
			
				|  |  | +                                SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if (mediaServerItem == null) {
 | 
	
		
			
				|  |  | +            logger.info("[国标级联] 语音喊话未找到可用的zlm. platform: {}", platform.getServerGBId());
 | 
	
		
			
				|  |  | +            return;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(platform.getServerGBId(), channelId);
 | 
	
		
			
				|  |  | +        if (streamInfo != null) {
 | 
	
		
			
				|  |  | +            // 如果zlm不存在这个流,则删除数据即可
 | 
	
		
			
				|  |  | +            MediaServerItem mediaServerItemForStreamInfo = mediaServerService.getOne(streamInfo.getMediaServerId());
 | 
	
		
			
				|  |  | +            if (mediaServerItemForStreamInfo != null) {
 | 
	
		
			
				|  |  | +                Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemForStreamInfo, streamInfo.getApp(), streamInfo.getStream());
 | 
	
		
			
				|  |  | +                if (!ready) {
 | 
	
		
			
				|  |  | +                    // 错误存在于redis中的数据
 | 
	
		
			
				|  |  | +                    redisCatchStorage.stopPlay(streamInfo);
 | 
	
		
			
				|  |  | +                }else {
 | 
	
		
			
				|  |  | +                    // 流确实尚在推流,直接回调结果
 | 
	
		
			
				|  |  | +                    JSONObject json = new JSONObject();
 | 
	
		
			
				|  |  | +                    json.put("app", streamInfo.getApp());
 | 
	
		
			
				|  |  | +                    json.put("stream", streamInfo.getStream());
 | 
	
		
			
				|  |  | +                    hookEvent.response(mediaServerItemForStreamInfo, json);
 | 
	
		
			
				|  |  | +                    return;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        String streamId = null;
 | 
	
		
			
				|  |  | +        if (mediaServerItem.isRtpEnable()) {
 | 
	
		
			
				|  |  | +            streamId = String.format("%s_%s", platform.getServerGBId(), channelId);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        // 默认不进行SSRC校验, TODO 后续可改为配置
 | 
	
		
			
				|  |  | +        boolean ssrcCheck = false;
 | 
	
		
			
				|  |  | +        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrcCheck, false);
 | 
	
		
			
				|  |  | +        if (ssrcInfo == null || ssrcInfo.getPort() < 0) {
 | 
	
		
			
				|  |  | +            logger.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel: {}", platform.getServerGBId(), channelId);
 | 
	
		
			
				|  |  | +            errorEvent.response(new SipSubscribe.EventResult(-1, "端口监听失败"));
 | 
	
		
			
				|  |  | +            return;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        logger.info("[国标级联] 发起语音喊话 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
 | 
	
		
			
				|  |  | +                platform.getServerGBId(), channelId, ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), ssrcInfo.getSsrc(), ssrcCheck);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        String timeOutTaskKey = UUID.randomUUID().toString();
 | 
	
		
			
				|  |  | +        dynamicTask.startDelay(timeOutTaskKey, () -> {
 | 
	
		
			
				|  |  | +            // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
 | 
	
		
			
				|  |  | +            if (redisCatchStorage.queryPlayByDevice(platform.getServerGBId(), channelId) == null) {
 | 
	
		
			
				|  |  | +                logger.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
 | 
	
		
			
				|  |  | +                // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
 | 
	
		
			
				|  |  | +                try {
 | 
	
		
			
				|  |  | +                    commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null);
 | 
	
		
			
				|  |  | +                } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
 | 
	
		
			
				|  |  | +                    logger.error("[点播超时], 发送BYE失败 {}", e.getMessage());
 | 
	
		
			
				|  |  | +                } finally {
 | 
	
		
			
				|  |  | +                    timeoutCallback.run(1, "收流超时");
 | 
	
		
			
				|  |  | +                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
 | 
	
		
			
				|  |  | +                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
 | 
	
		
			
				|  |  | +                    streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
 | 
	
		
			
				|  |  | +                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }, userSetting.getPlayTimeout());
 | 
	
		
			
				|  |  | +        commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, response)->{
 | 
	
		
			
				|  |  | +            dynamicTask.stop(timeOutTaskKey);
 | 
	
		
			
				|  |  | +            // hook响应
 | 
	
		
			
				|  |  | +            playService.onPublishHandlerForPlay(mediaServerItemForInvite, response, platform.getServerGBId(), channelId);
 | 
	
		
			
				|  |  | +            // 收到流
 | 
	
		
			
				|  |  | +            if (hookEvent != null) {
 | 
	
		
			
				|  |  | +                hookEvent.response(mediaServerItem, response);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }, event -> {
 | 
	
		
			
				|  |  | +            // 收到200OK 检测ssrc是否有变化,防止上级自定义了ssrc
 | 
	
		
			
				|  |  | +            ResponseEvent responseEvent = (ResponseEvent) event.event;
 | 
	
		
			
				|  |  | +            String contentString = new String(responseEvent.getResponse().getRawContent());
 | 
	
		
			
				|  |  | +            // 获取ssrc
 | 
	
		
			
				|  |  | +            int ssrcIndex = contentString.indexOf("y=");
 | 
	
		
			
				|  |  | +            // 检查是否有y字段
 | 
	
		
			
				|  |  | +            if (ssrcIndex >= 0) {
 | 
	
		
			
				|  |  | +                //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
 | 
	
		
			
				|  |  | +                String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
 | 
	
		
			
				|  |  | +                // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
 | 
	
		
			
				|  |  | +                if (ssrcInfo.getSsrc().equals(ssrcInResponse) || ssrcCheck) {
 | 
	
		
			
				|  |  | +                    return;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +                logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
 | 
	
		
			
				|  |  | +                if (!mediaServerItem.isRtpEnable()) {
 | 
	
		
			
				|  |  | +                    logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
 | 
	
		
			
				|  |  | +                        // ssrc 不可用
 | 
	
		
			
				|  |  | +                        // 释放ssrc
 | 
	
		
			
				|  |  | +                        mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
 | 
	
		
			
				|  |  | +                        streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
 | 
	
		
			
				|  |  | +                        event.msg = "下级自定义了ssrc,但是此ssrc不可用";
 | 
	
		
			
				|  |  | +                        event.statusCode = 400;
 | 
	
		
			
				|  |  | +                        errorEvent.response(event);
 | 
	
		
			
				|  |  | +                        return;
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    // 单端口模式streamId也有变化,需要重新设置监听
 | 
	
		
			
				|  |  | +                    if (!mediaServerItem.isRtpEnable()) {
 | 
	
		
			
				|  |  | +                        // 添加订阅
 | 
	
		
			
				|  |  | +                        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
 | 
	
		
			
				|  |  | +                        subscribe.removeSubscribe(hookSubscribe);
 | 
	
		
			
				|  |  | +                        hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
 | 
	
		
			
				|  |  | +                        subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
 | 
	
		
			
				|  |  | +                            logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
 | 
	
		
			
				|  |  | +                            dynamicTask.stop(timeOutTaskKey);
 | 
	
		
			
				|  |  | +                            // hook响应
 | 
	
		
			
				|  |  | +                            playService.onPublishHandlerForPlay(mediaServerItemInUse, response, platform.getServerGBId(), channelId);
 | 
	
		
			
				|  |  | +                            hookEvent.response(mediaServerItemInUse, response);
 | 
	
		
			
				|  |  | +                        });
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                    // 关闭rtp server
 | 
	
		
			
				|  |  | +                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
 | 
	
		
			
				|  |  | +                    // 重新开启ssrc server
 | 
	
		
			
				|  |  | +                    mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, false, false, ssrcInfo.getPort());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }, eventResult -> {
 | 
	
		
			
				|  |  | +            // 收到错误回复
 | 
	
		
			
				|  |  | +            if (errorEvent != null) {
 | 
	
		
			
				|  |  | +                errorEvent.response(eventResult);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        });
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @Override
 | 
	
		
			
				|  |  | +    public void stopBroadcast(ParentPlatform platform, String channelId, String stream) throws InvalidArgumentException, ParseException, SsrcTransactionNotFoundException, SipException {
 | 
	
		
			
				|  |  | +        commanderForPlatform.streamByeCmd(platform, channelId, stream, null, null);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |  }
 |