|
@@ -1,12 +1,10 @@
|
|
|
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
|
|
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
|
|
|
|
|
|
|
|
-import com.alibaba.fastjson2.JSONObject;
|
|
|
|
|
|
|
+import com.genersoft.iot.vmp.common.StreamInfo;
|
|
|
import com.genersoft.iot.vmp.conf.DynamicTask;
|
|
import com.genersoft.iot.vmp.conf.DynamicTask;
|
|
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
|
|
import com.genersoft.iot.vmp.gb28181.bean.*;
|
|
import com.genersoft.iot.vmp.gb28181.bean.*;
|
|
|
-import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
|
|
|
|
|
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
|
|
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
|
|
|
-import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
|
|
|
|
|
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
|
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
|
|
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
|
|
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
|
|
|
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
|
|
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
|
|
@@ -21,6 +19,8 @@ import com.genersoft.iot.vmp.service.IMediaServerService;
|
|
|
import com.genersoft.iot.vmp.service.IPlayService;
|
|
import com.genersoft.iot.vmp.service.IPlayService;
|
|
|
import com.genersoft.iot.vmp.service.IStreamProxyService;
|
|
import com.genersoft.iot.vmp.service.IStreamProxyService;
|
|
|
import com.genersoft.iot.vmp.service.IStreamPushService;
|
|
import com.genersoft.iot.vmp.service.IStreamPushService;
|
|
|
|
|
+import com.genersoft.iot.vmp.service.bean.InviteErrorCallback;
|
|
|
|
|
+import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
|
|
|
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
|
|
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
|
|
|
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
|
|
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
|
|
|
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
|
|
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
|
|
@@ -101,9 +101,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
@Autowired
|
|
@Autowired
|
|
|
private SIPProcessorObserver sipProcessorObserver;
|
|
private SIPProcessorObserver sipProcessorObserver;
|
|
|
|
|
|
|
|
- @Autowired
|
|
|
|
|
- private VideoStreamSessionManager sessionManager;
|
|
|
|
|
-
|
|
|
|
|
@Autowired
|
|
@Autowired
|
|
|
private UserSetting userSetting;
|
|
private UserSetting userSetting;
|
|
|
|
|
|
|
@@ -359,7 +356,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
}else {
|
|
}else {
|
|
|
streamTypeStr = "UDP";
|
|
streamTypeStr = "UDP";
|
|
|
}
|
|
}
|
|
|
- logger.info("[上级点播] 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", username, channelId, addressStr, port, streamTypeStr, ssrc);
|
|
|
|
|
|
|
+ logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
|
|
|
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
|
|
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
|
|
|
device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
|
|
device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
|
|
|
|
|
|
|
@@ -380,10 +377,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
|
|
|
|
|
Long finalStartTime = startTime;
|
|
Long finalStartTime = startTime;
|
|
|
Long finalStopTime = stopTime;
|
|
Long finalStopTime = stopTime;
|
|
|
- ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> {
|
|
|
|
|
- String app = responseJSON.getString("app");
|
|
|
|
|
- String stream = responseJSON.getString("stream");
|
|
|
|
|
- logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", app, stream);
|
|
|
|
|
|
|
+ InviteErrorCallback<Object> hookEvent = (code, msg, data) -> {
|
|
|
|
|
+ StreamInfo streamInfo = (StreamInfo)data;
|
|
|
|
|
+ MediaServerItem mediaServerItemInUSe = mediaServerService.getOne(streamInfo.getMediaServerId());
|
|
|
|
|
+ logger.info("[上级Invite]下级已经开始推流。 回复200OK(SDP), {}/{}", streamInfo.getApp(), streamInfo.getStream());
|
|
|
// * 0 等待设备推流上来
|
|
// * 0 等待设备推流上来
|
|
|
// * 1 下级已经推流,等待上级平台回复ack
|
|
// * 1 下级已经推流,等待上级平台回复ack
|
|
|
// * 2 推流中
|
|
// * 2 推流中
|
|
@@ -429,11 +426,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
|
|
logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
|
|
|
}
|
|
}
|
|
|
};
|
|
};
|
|
|
- SipSubscribe.Event errorEvent = ((event) -> {
|
|
|
|
|
|
|
+ InviteErrorCallback<Object> errorEvent = ((statusCode, msg, data) -> {
|
|
|
// 未知错误。直接转发设备点播的错误
|
|
// 未知错误。直接转发设备点播的错误
|
|
|
try {
|
|
try {
|
|
|
- Response response = getMessageFactory().createResponse(event.statusCode, evt.getRequest());
|
|
|
|
|
- sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response);
|
|
|
|
|
|
|
+ if (statusCode > 0) {
|
|
|
|
|
+ Response response = getMessageFactory().createResponse(statusCode, evt.getRequest());
|
|
|
|
|
+ sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response);
|
|
|
|
|
+ }
|
|
|
} catch (ParseException | SipException e) {
|
|
} catch (ParseException | SipException e) {
|
|
|
logger.error("未处理的异常 ", e);
|
|
logger.error("未处理的异常 ", e);
|
|
|
}
|
|
}
|
|
@@ -446,67 +445,70 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
// 写入redis, 超时时回复
|
|
// 写入redis, 超时时回复
|
|
|
redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
|
redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
|
|
playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
|
|
playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
|
|
|
- DateUtil.formatter.format(end), null, result -> {
|
|
|
|
|
- if (result.getCode() != 0) {
|
|
|
|
|
- logger.warn("录像回放失败");
|
|
|
|
|
- if (result.getEvent() != null) {
|
|
|
|
|
- errorEvent.response(result.getEvent());
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ DateUtil.formatter.format(end),
|
|
|
|
|
+ (code, msg, data) -> {
|
|
|
|
|
+ if (code == InviteErrorCode.SUCCESS.getCode()){
|
|
|
|
|
+ hookEvent.run(code, msg, data);
|
|
|
|
|
+ }else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){
|
|
|
|
|
+ logger.info("[录像回放]超时, 用户:{}, 通道:{}", username, channelId);
|
|
|
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
|
|
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
|
|
|
- try {
|
|
|
|
|
- responseAck(request, Response.REQUEST_TIMEOUT);
|
|
|
|
|
- } catch (SipException | InvalidArgumentException | ParseException e) {
|
|
|
|
|
- logger.error("[命令发送失败] 国标级联 录像回放 发送REQUEST_TIMEOUT: {}", e.getMessage());
|
|
|
|
|
- }
|
|
|
|
|
- } else {
|
|
|
|
|
- if (result.getMediaServerItem() != null) {
|
|
|
|
|
- hookEvent.response(result.getMediaServerItem(), result.getResponse());
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ errorEvent.run(code, msg, data);
|
|
|
|
|
+ }else {
|
|
|
|
|
+ errorEvent.run(code, msg, data);
|
|
|
}
|
|
}
|
|
|
});
|
|
});
|
|
|
- } else {
|
|
|
|
|
|
|
+ }else if ("Download".equalsIgnoreCase(sessionName)) {
|
|
|
|
|
+ // 获取指定的下载速度
|
|
|
|
|
+ Vector sdpMediaDescriptions = sdp.getMediaDescriptions(true);
|
|
|
|
|
+ MediaDescription mediaDescription = null;
|
|
|
|
|
+ String downloadSpeed = "1";
|
|
|
|
|
+ if (sdpMediaDescriptions.size() > 0) {
|
|
|
|
|
+ mediaDescription = (MediaDescription)sdpMediaDescriptions.get(0);
|
|
|
|
|
+ }
|
|
|
|
|
+ if (mediaDescription != null) {
|
|
|
|
|
+ downloadSpeed = mediaDescription.getAttribute("downloadspeed");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD);
|
|
|
|
|
+ SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
|
|
|
|
|
+ sendRtpItem.setStreamId(ssrcInfo.getStream());
|
|
|
|
|
+ // 写入redis, 超时时回复
|
|
|
|
|
+ redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
|
|
|
|
+ playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
|
|
|
|
|
+ DateUtil.formatter.format(end), Integer.parseInt(downloadSpeed),
|
|
|
|
|
+ (code, msg, data) -> {
|
|
|
|
|
+ if (code == InviteErrorCode.SUCCESS.getCode()){
|
|
|
|
|
+ hookEvent.run(code, msg, data);
|
|
|
|
|
+ }else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){
|
|
|
|
|
+ logger.info("[录像下载]超时, 用户:{}, 通道:{}", username, channelId);
|
|
|
|
|
+ redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
|
|
|
|
|
+ errorEvent.run(code, msg, data);
|
|
|
|
|
+ }else {
|
|
|
|
|
+ errorEvent.run(code, msg, data);
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ }else {
|
|
|
sendRtpItem.setPlayType(InviteStreamType.PLAY);
|
|
sendRtpItem.setPlayType(InviteStreamType.PLAY);
|
|
|
- SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
|
|
|
|
|
- if (playTransaction != null) {
|
|
|
|
|
- Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream());
|
|
|
|
|
- if (!streamReady) {
|
|
|
|
|
- boolean hasRtpServer = mediaServerService.checkRtpServer(mediaServerItem, "rtp", playTransaction.getStream());
|
|
|
|
|
- if (hasRtpServer) {
|
|
|
|
|
- logger.info("[上级点播]已经开启rtpServer但是尚未收到流,开启监听流的到来");
|
|
|
|
|
- HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", playTransaction.getStream(), true, "rtsp", mediaServerItem.getId());
|
|
|
|
|
- zlmHttpHookSubscribe.addSubscribe(hookSubscribe, hookEvent);
|
|
|
|
|
- }else {
|
|
|
|
|
- playTransaction = null;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ String streamId = null;
|
|
|
|
|
+ if (mediaServerItem.isRtpEnable()) {
|
|
|
|
|
+ streamId = String.format("%s_%s", device.getDeviceId(), channelId);
|
|
|
|
|
+ }else {
|
|
|
|
|
+ streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase();
|
|
|
}
|
|
}
|
|
|
- if (playTransaction == null) {
|
|
|
|
|
- // 被点播的通道目前未被点播,则开始点播
|
|
|
|
|
- String streamId = null;
|
|
|
|
|
- if (mediaServerItem.isRtpEnable()) {
|
|
|
|
|
- streamId = String.format("%s_%s", device.getDeviceId(), channelId);
|
|
|
|
|
- }
|
|
|
|
|
- SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());
|
|
|
|
|
- logger.info(JSONObject.toJSONString(ssrcInfo));
|
|
|
|
|
- sendRtpItem.setStreamId(ssrcInfo.getStream());
|
|
|
|
|
-// sendRtpItem.setSsrc(ssrcInfo.getSsrc());
|
|
|
|
|
-
|
|
|
|
|
- // 写入redis, 超时时回复
|
|
|
|
|
- redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
|
|
|
|
- playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> {
|
|
|
|
|
|
|
+ sendRtpItem.setStreamId(streamId);
|
|
|
|
|
+ redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
|
|
|
|
+ playService.play(mediaServerItem, device.getDeviceId(), channelId, ((code, msg, data) -> {
|
|
|
|
|
+ if (code == InviteErrorCode.SUCCESS.getCode()){
|
|
|
|
|
+ hookEvent.run(code, msg, data);
|
|
|
|
|
+ }else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){
|
|
|
logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId);
|
|
logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId);
|
|
|
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
|
|
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
|
|
|
- });
|
|
|
|
|
- } else {
|
|
|
|
|
|
|
+ errorEvent.run(code, msg, data);
|
|
|
|
|
+ }else {
|
|
|
|
|
+ errorEvent.run(code, msg, data);
|
|
|
|
|
+ }
|
|
|
|
|
+ }));
|
|
|
|
|
|
|
|
- sendRtpItem.setStreamId(playTransaction.getStream());
|
|
|
|
|
- // 写入redis, 超时时回复
|
|
|
|
|
- redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
|
|
|
|
- JSONObject jsonObject = new JSONObject();
|
|
|
|
|
- jsonObject.put("app", sendRtpItem.getApp());
|
|
|
|
|
- jsonObject.put("stream", sendRtpItem.getStreamId());
|
|
|
|
|
- hookEvent.response(mediaServerItem, jsonObject);
|
|
|
|
|
- }
|
|
|
|
|
}
|
|
}
|
|
|
} else if (gbStream != null) {
|
|
} else if (gbStream != null) {
|
|
|
|
|
|
|
@@ -559,7 +561,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
|
|
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
|
|
|
String channelId, String addressStr, String ssrc, String requesterId) {
|
|
String channelId, String addressStr, String ssrc, String requesterId) {
|
|
|
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
|
|
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
|
|
|
- if (streamReady) {
|
|
|
|
|
|
|
+ if (streamReady != null && streamReady) {
|
|
|
// 自平台内容
|
|
// 自平台内容
|
|
|
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
|
|
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
|
|
|
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
|
|
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
|
|
@@ -598,7 +600,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
// 推流
|
|
// 推流
|
|
|
if (streamPushItem.isSelf()) {
|
|
if (streamPushItem.isSelf()) {
|
|
|
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
|
|
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
|
|
|
- if (streamReady) {
|
|
|
|
|
|
|
+ if (streamReady != null && streamReady) {
|
|
|
// 自平台内容
|
|
// 自平台内容
|
|
|
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
|
|
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
|
|
|
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
|
|
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
|