| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922 | package com.genersoft.iot.vmp.service.impl;import com.alibaba.fastjson2.JSON;import com.alibaba.fastjson2.JSONArray;import com.alibaba.fastjson2.JSONObject;import com.genersoft.iot.vmp.common.StreamInfo;import com.genersoft.iot.vmp.conf.DynamicTask;import com.genersoft.iot.vmp.conf.UserSetting;import com.genersoft.iot.vmp.conf.exception.ControllerException;import com.genersoft.iot.vmp.conf.exception.ServiceException;import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;import com.genersoft.iot.vmp.gb28181.bean.*;import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;import com.genersoft.iot.vmp.service.IDeviceService;import com.genersoft.iot.vmp.service.IMediaServerService;import com.genersoft.iot.vmp.service.IMediaService;import com.genersoft.iot.vmp.service.IPlayService;import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;import com.genersoft.iot.vmp.service.bean.PlayBackCallback;import com.genersoft.iot.vmp.service.bean.PlayBackResult;import com.genersoft.iot.vmp.service.bean.SSRCInfo;import com.genersoft.iot.vmp.storager.IRedisCatchStorage;import com.genersoft.iot.vmp.storager.IVideoManagerStorage;import com.genersoft.iot.vmp.utils.DateUtil;import com.genersoft.iot.vmp.utils.redis.RedisUtil;import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;import com.genersoft.iot.vmp.vmanager.bean.WVPResult;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import org.springframework.util.ObjectUtils;import javax.sip.InvalidArgumentException;import javax.sip.ResponseEvent;import javax.sip.SipException;import java.math.BigDecimal;import java.math.RoundingMode;import java.text.ParseException;import java.util.List;import java.util.UUID;@SuppressWarnings(value = {"rawtypes", "unchecked"})@Servicepublic class PlayServiceImpl implements IPlayService {    private final static Logger logger = LoggerFactory.getLogger(PlayServiceImpl.class);    @Autowired    private IVideoManagerStorage storager;    @Autowired    private SIPCommander cmder;    @Autowired    private SIPCommanderFroPlatform sipCommanderFroPlatform;    @Autowired    private IRedisCatchStorage redisCatchStorage;    @Autowired    private DeferredResultHolder resultHolder;    @Autowired    private ZLMRESTfulUtils zlmresTfulUtils;    @Autowired    private AssistRESTfulUtils assistRESTfulUtils;    @Autowired    private IMediaService mediaService;    @Autowired    private IMediaServerService mediaServerService;    @Autowired    private VideoStreamSessionManager streamSession;    @Autowired    private IDeviceService deviceService;    @Autowired    private UserSetting userSetting;    @Autowired    private DynamicTask dynamicTask;    @Autowired    private ZlmHttpHookSubscribe subscribe;    @Override    public void play(MediaServerItem mediaServerItem, String deviceId, String channelId,                                 ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,                                 Runnable timeoutCallback) {        if (mediaServerItem == null) {            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");        }        String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;        RequestMessage msg = new RequestMessage();        msg.setKey(key);        Device device = redisCatchStorage.getDevice(deviceId);        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);        if (streamInfo != null) {            String streamId = streamInfo.getStream();            if (streamId == null) {                WVPResult wvpResult = new WVPResult();                wvpResult.setCode(ErrorCode.ERROR100.getCode());                wvpResult.setMsg("点播失败, redis缓存streamId等于null");                msg.setData(wvpResult);                resultHolder.invokeAllResult(msg);                return;            }            String mediaServerId = streamInfo.getMediaServerId();            MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);            JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);            if (rtpInfo.getInteger("code") == 0) {                if (rtpInfo.getBoolean("exist")) {                    int localPort = rtpInfo.getInteger("local_port");                    if (localPort == 0) {                        logger.warn("[点播],点播时发现rtpServerC存在,但是尚未开始推流");                        // 此时说明rtpServer已经创建但是流还没有推上来                        WVPResult wvpResult = new WVPResult();                        wvpResult.setCode(ErrorCode.ERROR100.getCode());                        wvpResult.setMsg("点播已经在进行中,请稍候重试");                        msg.setData(wvpResult);                        resultHolder.invokeAllResult(msg);                        return;                    } else {                        WVPResult wvpResult = new WVPResult();                        wvpResult.setCode(ErrorCode.SUCCESS.getCode());                        wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());                        wvpResult.setData(streamInfo);                        msg.setData(wvpResult);                        resultHolder.invokeAllResult(msg);                        if (hookEvent != null) {                            hookEvent.response(mediaServerItem, JSON.parseObject(JSON.toJSONString(streamInfo)));                        }                    }                } else {                    redisCatchStorage.stopPlay(streamInfo);                    storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());                    streamInfo = null;                }            } else {                //zlm连接失败                redisCatchStorage.stopPlay(streamInfo);                storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());                streamInfo = null;            }        }        if (streamInfo == null) {            String streamId = null;            if (mediaServerItem.isRtpEnable()) {                streamId = String.format("%s_%s", device.getDeviceId(), channelId);            }            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);            if (ssrcInfo == null) {                WVPResult wvpResult = new WVPResult();                wvpResult.setCode(ErrorCode.ERROR100.getCode());                wvpResult.setMsg("开启收流失败");                msg.setData(wvpResult);                resultHolder.invokeAllResult(msg);                return;            }            play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response) -> {                if (hookEvent != null) {                    hookEvent.response(mediaServerItem, response);                }            }, event -> {                // sip error错误                WVPResult wvpResult = new WVPResult();                wvpResult.setCode(ErrorCode.ERROR100.getCode());                wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));                msg.setData(wvpResult);                resultHolder.invokeAllResult(msg);                if (errorEvent != null) {                    errorEvent.response(event);                }            }, (code, msgStr) -> {                // invite点播超时                WVPResult wvpResult = new WVPResult();                wvpResult.setCode(ErrorCode.ERROR100.getCode());                if (code == 0) {                    wvpResult.setMsg("点播超时,请稍候重试");                } else if (code == 1) {                    wvpResult.setMsg("收流超时,请稍候重试");                }                msg.setData(wvpResult);                // 回复之前所有的点播请求                resultHolder.invokeAllResult(msg);            });        }    }    @Override    public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,                     ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,                     InviteTimeOutCallback timeoutCallback) {        logger.info("[点播开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());        // 超时处理        String timeOutTaskKey = UUID.randomUUID().toString();        dynamicTask.startDelay(timeOutTaskKey, () -> {            // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况            if (redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId) == null) {                logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());                // 点播超时回复BYE 同时释放ssrc以及此次点播的资源                try {                    cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);                } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {                    logger.error("[点播超时], 发送BYE失败 {}", e.getMessage());                } finally {                    timeoutCallback.run(1, "收流超时");                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());                    streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());                    // 取消订阅消息监听                    HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());                    subscribe.removeSubscribe(hookSubscribe);                }            }        }, userSetting.getPlayTimeout());        //端口获取失败的ssrcInfo 没有必要发送点播指令        if (ssrcInfo.getPort() <= 0) {            logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);            dynamicTask.stop(timeOutTaskKey);            // 释放ssrc            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());            RequestMessage msg = new RequestMessage();            msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + device.getDeviceId() + channelId);            msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "点播端口分配异常"));            resultHolder.invokeAllResult(msg);            return;        }        try {            cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {                logger.info("收到订阅消息: " + response.toJSONString());                dynamicTask.stop(timeOutTaskKey);                // hook响应                onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId);                hookEvent.response(mediaServerItemInuse, response);                logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);                String streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp",  ssrcInfo.getStream());                String path = "snap";                String fileName = device.getDeviceId() + "_" + channelId + ".jpg";                // 请求截图                logger.info("[请求截图]: " + fileName);                zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);            }, (event) -> {                ResponseEvent responseEvent = (ResponseEvent) event.event;                String contentString = new String(responseEvent.getResponse().getRawContent());                // 获取ssrc                int ssrcIndex = contentString.indexOf("y=");                // 检查是否有y字段                if (ssrcIndex >= 0) {                    //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容                    String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);                    // 查询到ssrc不一致且开启了ssrc校验则需要针对处理                    if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {                        return;                    }                    logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);                    if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {                        logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);                        if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {                            // ssrc 不可用                            // 释放ssrc                            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());                            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());                            event.msg = "下级自定义了ssrc,但是此ssrc不可用";                            event.statusCode = 400;                            errorEvent.response(event);                            return;                        }                        // 单端口模式streamId也有变化,需要重新设置监听                        if (!mediaServerItem.isRtpEnable()) {                            // 添加订阅                            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());                            subscribe.removeSubscribe(hookSubscribe);                            hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());                            subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {                                logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());                                dynamicTask.stop(timeOutTaskKey);                                // hook响应                                onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId);                                hookEvent.response(mediaServerItemInUse, response);                            });                        }                        // 关闭rtp server                        mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());                        // 重新开启ssrc server                        mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, ssrcInfo.getPort());                    }                }            }, (event) -> {                dynamicTask.stop(timeOutTaskKey);                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());                // 释放ssrc                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());                streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());                errorEvent.response(event);            });        } catch (InvalidArgumentException | SipException | ParseException e) {            logger.error("[命令发送失败] 点播消息: {}", e.getMessage());            dynamicTask.stop(timeOutTaskKey);            mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());            // 释放ssrc            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());            SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null));            eventResult.msg = "命令发送失败";            errorEvent.response(eventResult);        }    }    @Override    public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) {        StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);        RequestMessage msg = new RequestMessage();        msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);        if (streamInfo != null) {            DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);            if (deviceChannel != null) {                deviceChannel.setStreamId(streamInfo.getStream());                storager.startPlay(deviceId, channelId, streamInfo.getStream());            }            redisCatchStorage.startPlay(streamInfo);            WVPResult wvpResult = new WVPResult();            wvpResult.setCode(ErrorCode.SUCCESS.getCode());            wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());            wvpResult.setData(streamInfo);            msg.setData(wvpResult);            resultHolder.invokeAllResult(msg);        } else {            logger.warn("设备预览API调用失败!");            msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败!"));            resultHolder.invokeAllResult(msg);        }    }    private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, PlayBackCallback playBackCallback) {        StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);        PlayBackResult<StreamInfo> playBackResult = new PlayBackResult<>();        if (streamInfo != null) {            DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);            if (deviceChannel != null) {                deviceChannel.setStreamId(streamInfo.getStream());                storager.startPlay(deviceId, channelId, streamInfo.getStream());            }            redisCatchStorage.startPlay(streamInfo);            playBackResult.setCode(ErrorCode.SUCCESS.getCode());            playBackResult.setMsg(ErrorCode.SUCCESS.getMsg());            playBackResult.setData(streamInfo);            playBackCallback.call(playBackResult);        } else {            logger.warn("录像回放调用失败!");            playBackResult.setCode(ErrorCode.ERROR100.getCode());            playBackResult.setMsg("录像回放调用失败!");            playBackCallback.call(playBackResult);        }    }    @Override    public MediaServerItem getNewMediaServerItem(Device device) {        if (device == null) {            return null;        }        MediaServerItem mediaServerItem;        if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {            mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);        } else {            mediaServerItem = mediaServerService.getOne(device.getMediaServerId());        }        if (mediaServerItem == null) {            logger.warn("点播时未找到可使用的ZLM...");        }        return mediaServerItem;    }    @Override    public MediaServerItem getNewMediaServerItemHasAssist(Device device) {        if (device == null) {            return null;        }        MediaServerItem mediaServerItem;        if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {            mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(true);        } else {            mediaServerItem = mediaServerService.getOne(device.getMediaServerId());        }        if (mediaServerItem == null) {            logger.warn("[获取可用的ZLM节点]未找到可使用的ZLM...");        }        return mediaServerItem;    }    @Override    public void playBack(String deviceId, String channelId, String startTime,                                                          String endTime, InviteStreamCallback inviteStreamCallback,                                                          PlayBackCallback callback) {        Device device = storager.queryVideoDevice(deviceId);        if (device == null) {            return;        }        MediaServerItem newMediaServerItem = getNewMediaServerItem(device);        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, device.isSsrcCheck(), true);        playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback);    }    @Override    public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,                                                          String deviceId, String channelId, String startTime,                                                          String endTime, InviteStreamCallback infoCallBack,                                                          PlayBackCallback playBackCallback) {        if (mediaServerItem == null || ssrcInfo == null) {            return;        }        Device device = storager.queryVideoDevice(deviceId);        if (device == null) {            throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备: " + deviceId + "不存在");        }        PlayBackResult<StreamInfo> playBackResult = new PlayBackResult<>();        String playBackTimeOutTaskKey = UUID.randomUUID().toString();        dynamicTask.startDelay(playBackTimeOutTaskKey, () -> {            logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId));            playBackResult.setCode(ErrorCode.ERROR100.getCode());            playBackResult.setMsg("回放超时");            try {                cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);            } catch (InvalidArgumentException | ParseException | SipException e) {                logger.error("[录像流]回放超时 发送BYE失败 {}", e.getMessage());            } catch (SsrcTransactionNotFoundException e) {                // 点播超时回复BYE 同时释放ssrc以及此次点播的资源                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());                streamSession.remove(deviceId, channelId, ssrcInfo.getStream());            }            // 回复之前所有的点播请求            playBackCallback.call(playBackResult);        }, userSetting.getPlayTimeout());        SipSubscribe.Event errorEvent = event -> {            dynamicTask.stop(playBackTimeOutTaskKey);            playBackResult.setCode(ErrorCode.ERROR100.getCode());            playBackResult.setMsg(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));            playBackResult.setEvent(event);            playBackCallback.call(playBackResult);            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());        };        InviteStreamCallback hookEvent = (InviteStreamInfo inviteStreamInfo) -> {            logger.info("收到回放订阅消息: " + inviteStreamInfo.getResponse().toJSONString());            dynamicTask.stop(playBackTimeOutTaskKey);            StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);            if (streamInfo == null) {                logger.warn("设备回放API调用失败!");                playBackResult.setCode(ErrorCode.ERROR100.getCode());                playBackResult.setMsg("设备回放API调用失败!");                playBackCallback.call(playBackResult);                return;            }            redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId());            playBackResult.setCode(ErrorCode.SUCCESS.getCode());            playBackResult.setMsg(ErrorCode.SUCCESS.getMsg());            playBackResult.setData(streamInfo);            playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());            playBackResult.setResponse(inviteStreamInfo.getResponse());            playBackCallback.call(playBackResult);        };        try {            cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack,                    hookEvent, eventResult -> {                        if (eventResult.type == SipSubscribe.EventResultType.response) {                            ResponseEvent responseEvent = (ResponseEvent) eventResult.event;                            String contentString = new String(responseEvent.getResponse().getRawContent());                            // 获取ssrc                            int ssrcIndex = contentString.indexOf("y=");                            // 检查是否有y字段                            if (ssrcIndex >= 0) {                                //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容                                String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);                                // 查询到ssrc不一致且开启了ssrc校验则需要针对处理                                if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {                                    return;                                }                                logger.info("[回放消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);                                if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {                                    logger.info("[回放消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);                                    if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {                                        // ssrc 不可用                                        // 释放ssrc                                        mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());                                        streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());                                        eventResult.msg = "下级自定义了ssrc,但是此ssrc不可用";                                        eventResult.statusCode = 400;                                        errorEvent.response(eventResult);                                        return;                                    }                                    // 单端口模式streamId也有变化,需要重新设置监听                                    if (!mediaServerItem.isRtpEnable()) {                                        // 添加订阅                                        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());                                        subscribe.removeSubscribe(hookSubscribe);                                        hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());                                        subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {                                            logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());                                            dynamicTask.stop(playBackTimeOutTaskKey);                                            // hook响应                                            onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, playBackCallback);                                            hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));                                        });                                    }                                    // 关闭rtp server                                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());                                    // 重新开启ssrc server                                    mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort());                                }                            }                        }                    }, errorEvent);        } catch (InvalidArgumentException | SipException | ParseException e) {            logger.error("[命令发送失败] 回放: {}", e.getMessage());            SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null));            eventResult.msg = "命令发送失败";            errorEvent.response(eventResult);        }    }    @Override    public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback playBackCallback) {        Device device = storager.queryVideoDevice(deviceId);        if (device == null) {            return;        }        MediaServerItem newMediaServerItem = getNewMediaServerItemHasAssist(device);        if (newMediaServerItem == null) {            PlayBackResult<StreamInfo> downloadResult = new PlayBackResult<>();            downloadResult.setCode(ErrorCode.ERROR100.getCode());            downloadResult.setMsg("未找到assist服务");            playBackCallback.call(downloadResult);            return;        }        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, device.isSsrcCheck(), true);        download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, infoCallBack, playBackCallback);    }    @Override    public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {        if (mediaServerItem == null || ssrcInfo == null) {            return;        }        Device device = storager.queryVideoDevice(deviceId);        if (device == null) {            throw new ControllerException(ErrorCode.ERROR400.getCode(), "设备:" + deviceId + "不存在");        }        PlayBackResult<StreamInfo> downloadResult = new PlayBackResult<>();        String downLoadTimeOutTaskKey = UUID.randomUUID().toString();        dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> {            logger.warn(String.format("录像下载请求超时,deviceId:%s ,channelId:%s", deviceId, channelId));            downloadResult.setCode(ErrorCode.ERROR100.getCode());            downloadResult.setMsg("录像下载请求超时");            hookCallBack.call(downloadResult);            // 点播超时回复BYE 同时释放ssrc以及此次点播的资源            try {                cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);            } catch (InvalidArgumentException | ParseException | SipException e) {                logger.error("[录像流]录像下载请求超时, 发送BYE失败 {}", e.getMessage());            } catch (SsrcTransactionNotFoundException e) {                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());                streamSession.remove(deviceId, channelId, ssrcInfo.getStream());            }        }, userSetting.getPlayTimeout());        SipSubscribe.Event errorEvent = event -> {            dynamicTask.stop(downLoadTimeOutTaskKey);            downloadResult.setCode(ErrorCode.ERROR100.getCode());            downloadResult.setMsg(String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg));            downloadResult.setEvent(event);            hookCallBack.call(downloadResult);            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());        };        InviteStreamCallback hookEvent = (InviteStreamInfo inviteStreamInfo) -> {            logger.info("收到订阅消息: " + inviteStreamInfo.getCallId());            dynamicTask.stop(downLoadTimeOutTaskKey);            StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);            streamInfo.setStartTime(startTime);            streamInfo.setEndTime(endTime);            redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());            downloadResult.setCode(ErrorCode.SUCCESS.getCode());            downloadResult.setMsg(ErrorCode.SUCCESS.getMsg());            downloadResult.setData(streamInfo);            downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());            downloadResult.setResponse(inviteStreamInfo.getResponse());            hookCallBack.call(downloadResult);        };        try {            cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, infoCallBack,                    hookEvent, errorEvent, eventResult ->                    {                        if (eventResult.type == SipSubscribe.EventResultType.response) {                            ResponseEvent responseEvent = (ResponseEvent) eventResult.event;                            String contentString = new String(responseEvent.getResponse().getRawContent());                            // 获取ssrc                            int ssrcIndex = contentString.indexOf("y=");                            // 检查是否有y字段                            if (ssrcIndex >= 0) {                                //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容                                String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);                                // 查询到ssrc不一致且开启了ssrc校验则需要针对处理                                if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {                                    return;                                }                                logger.info("[回放消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);                                if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {                                    logger.info("[回放消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);                                    if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {                                        // ssrc 不可用                                        // 释放ssrc                                        mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());                                        streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());                                        eventResult.msg = "下级自定义了ssrc,但是此ssrc不可用";                                        eventResult.statusCode = 400;                                        errorEvent.response(eventResult);                                        return;                                    }                                    // 单端口模式streamId也有变化,需要重新设置监听                                    if (!mediaServerItem.isRtpEnable()) {                                        // 添加订阅                                        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());                                        subscribe.removeSubscribe(hookSubscribe);                                        hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());                                        subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {                                            logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());                                            dynamicTask.stop(downLoadTimeOutTaskKey);                                            // hook响应                                            onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, hookCallBack);                                            hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));                                        });                                    }                                    // 关闭rtp server                                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());                                    // 重新开启ssrc server                                    mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort());                                }                            }                        }                    });        } catch (InvalidArgumentException | SipException | ParseException e) {            logger.error("[命令发送失败] 录像下载: {}", e.getMessage());            SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null));            eventResult.msg = "命令发送失败";            errorEvent.response(eventResult);        }    }    @Override    public StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream) {        StreamInfo streamInfo = redisCatchStorage.queryDownload(deviceId, channelId, stream, null);        if (streamInfo != null) {            if (streamInfo.getProgress() == 1) {                return streamInfo;            }            // 获取当前已下载时长            String mediaServerId = streamInfo.getMediaServerId();            MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);            if (mediaServerItem == null) {                logger.warn("查询录像信息时发现节点已离线");                return null;            }            if (mediaServerItem.getRecordAssistPort() > 0) {                JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, streamInfo.getApp(), streamInfo.getStream(), null);                if (jsonObject == null) {                    throw new ControllerException(ErrorCode.ERROR100.getCode(), "连接Assist服务失败");                }                if (jsonObject.getInteger("code") == 0) {                    long duration = jsonObject.getLong("data");                    if (duration == 0) {                        streamInfo.setProgress(0);                    } else {                        String startTime = streamInfo.getStartTime();                        String endTime = streamInfo.getEndTime();                        long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);                        long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);                        BigDecimal currentCount = new BigDecimal(duration / 1000);                        BigDecimal totalCount = new BigDecimal(end - start);                        BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);                        double process = divide.doubleValue();                        streamInfo.setProgress(process);                    }                }            }        }        return streamInfo;    }    @Override    public void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String uuid) {        RequestMessage msg = new RequestMessage();        msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId);        msg.setId(uuid);        StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);        if (streamInfo != null) {            redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());            msg.setData(JSON.toJSONString(streamInfo));            resultHolder.invokeResult(msg);        } else {            logger.warn("设备预览API调用失败!");            msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败!"));            resultHolder.invokeResult(msg);        }    }    public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {        String streamId = resonse.getString("stream");        JSONArray tracks = resonse.getJSONArray("tracks");        StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", streamId, tracks, null);        streamInfo.setDeviceID(deviceId);        streamInfo.setChannelId(channelId);        return streamInfo;    }    @Override    public void zlmServerOffline(String mediaServerId) {        // 处理正在向上推流的上级平台        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);        if (sendRtpItems.size() > 0) {            for (SendRtpItem sendRtpItem : sendRtpItems) {                if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {                    ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());                    try {                        sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());                    } catch (SipException | InvalidArgumentException | ParseException e) {                        logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());                    }                }            }        }        // 处理正在观看的国标设备        List<SsrcTransaction> allSsrc = streamSession.getAllSsrc();        if (allSsrc.size() > 0) {            for (SsrcTransaction ssrcTransaction : allSsrc) {                if (ssrcTransaction.getMediaServerId().equals(mediaServerId)) {                    Device device = deviceService.getDevice(ssrcTransaction.getDeviceId());                    if (device == null) {                        continue;                    }                    try {                        cmder.streamByeCmd(device, ssrcTransaction.getChannelId(),                                ssrcTransaction.getStream(), null);                    } catch (InvalidArgumentException | ParseException | SipException |                            SsrcTransactionNotFoundException e) {                        logger.error("[zlm离线]为正在使用此zlm的设备, 发送BYE失败 {}", e.getMessage());                    }                }            }        }    }    @Override    public void zlmServerOnline(String mediaServerId) {        // TODO 查找之前的点播,流如果不存在则给下级发送bye//        MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);//        zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{//            Integer code = mediaList.getInteger("code");//            if (code == 0) {//                JSONArray data = mediaList.getJSONArray("data");//                if (data == null || data.size() == 0) {//                    zlmServerOffline(mediaServerId);//                }else {//                    Map<String, JSONObject> mediaListMap = new HashMap<>();//                    for (int i = 0; i < data.size(); i++) {//                        JSONObject json = data.getJSONObject(i);//                        String app = json.getString("app");//                        if ("rtp".equals(app)) {//                            String stream = json.getString("stream");//                            if (mediaListMap.get(stream) != null) {//                                continue;//                            }//                            mediaListMap.put(stream, json);//                            // 处理正在观看的国标设备//                            List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(null, null, null, stream);//                            if (ssrcTransactions.size() > 0) {//                                for (SsrcTransaction ssrcTransaction : ssrcTransactions) {//                                    if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {//                                        cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(),//                                                ssrcTransaction.getStream(), null);//                                    }//                                }//                            }//                        }//                    }//                    if (mediaListMap.size() > 0 ) {//                        // 处理正在向上推流的上级平台//                        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);//                        if (sendRtpItems.size() > 0) {//                            for (SendRtpItem sendRtpItem : sendRtpItems) {//                                if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {//                                    if (mediaListMap.get(sendRtpItem.getStreamId()) == null) {//                                        ParentPlatform platform = storager.queryPlatformByServerGBId(sendRtpItem.getPlatformId());//                                        sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());//                                    }//                                }//                            }//                        }//                    }//                }//            }//        }));    }    @Override    public void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {        String key = redisCatchStorage.queryPlaybackForKey(null, null, streamId, null);        StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null);        if (null == streamInfo) {            logger.warn("streamId不存在!");            throw new ServiceException("streamId不存在");        }        streamInfo.setPause(true);        RedisUtil.set(key, streamInfo);        MediaServerItem mediaServerItem = mediaServerService.getOne(streamInfo.getMediaServerId());        if (null == mediaServerItem) {            logger.warn("mediaServer 不存在!");            throw new ServiceException("mediaServer不存在");        }        // zlm 暂停RTP超时检查        JSONObject jsonObject = zlmresTfulUtils.pauseRtpCheck(mediaServerItem, streamId);        if (jsonObject == null || jsonObject.getInteger("code") != 0) {            throw new ServiceException("暂停RTP接收失败");        }        Device device = storager.queryVideoDevice(streamInfo.getDeviceID());        cmder.playPauseCmd(device, streamInfo);    }    @Override    public void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {        String key = redisCatchStorage.queryPlaybackForKey(null, null, streamId, null);        StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null);        if (null == streamInfo) {            logger.warn("streamId不存在!");            throw new ServiceException("streamId不存在");        }        streamInfo.setPause(false);        RedisUtil.set(key, streamInfo);        MediaServerItem mediaServerItem = mediaServerService.getOne(streamInfo.getMediaServerId());        if (null == mediaServerItem) {            logger.warn("mediaServer 不存在!");            throw new ServiceException("mediaServer不存在");        }        // zlm 暂停RTP超时检查        JSONObject jsonObject = zlmresTfulUtils.resumeRtpCheck(mediaServerItem, streamId);        if (jsonObject == null || jsonObject.getInteger("code") != 0) {            throw new ServiceException("继续RTP接收失败");        }        Device device = storager.queryVideoDevice(streamInfo.getDeviceID());        cmder.playResumeCmd(device, streamInfo);    }}
 |