瀏覽代碼

Merge branch 'wvp-28181-2.0' into main-dev

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
#	src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
648540858 2 年之前
父節點
當前提交
3cada22743

+ 2 - 0
sql/2.6.9更新.sql

@@ -0,0 +1,2 @@
+alter table wvp_device_channel
+    change stream_id stream_id varying(255)

+ 1 - 1
sql/初始化.sql

@@ -79,7 +79,7 @@ create table wvp_device_channel (
                                     custom_longitude double precision,
                                     latitude double precision,
                                     custom_latitude double precision,
-                                    stream_id character varying(50),
+                                    stream_id character varying(255),
                                     device_id character varying(50) not null,
                                     parental character varying(50),
                                     has_audio bool default false,

+ 4 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java

@@ -98,6 +98,10 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 			logger.warn("[收到ACK]:未找到来自{},目标为({})的推流信息",fromUserId, toUserId);
 			return;
 		}
+        // tcp主动时,此时是级联下级平台,在回复200ok时,本地已经请求zlm开启监听,跳过下面步骤
+        if (sendRtpItem.isTcpActive()) {
+            return;
+        }
 		logger.info("[收到ACK]:rtp/{}开始级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(),
 				sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
 		// 取消设置的超时任务

+ 43 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -50,6 +50,8 @@ import javax.sip.header.CallIdHeader;
 import javax.sip.message.Response;
 import java.text.ParseException;
 import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 import java.util.Vector;
 
@@ -406,6 +408,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                         content.append("y=" + sendRtpItem.getSsrc() + "\r\n");
                         content.append("f=\r\n");
 
+
                         try {
                             // 超时未收到Ack应该回复bye,当前等待时间为10秒
                             dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
@@ -418,8 +421,34 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                                     logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                                 }
                             }, 60 * 1000);
-
-                             responseSdpAck(request, content.toString(), platform);
+                            responseSdpAck(request, content.toString(), platform);
+                            // tcp主动模式,回复sdp后开启监听
+                            if (sendRtpItem.isTcpActive()) {
+                                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+                                Map<String, Object> param = new HashMap<>(12);
+                                param.put("vhost","__defaultVhost__");
+                                param.put("app",sendRtpItem.getApp());
+                                param.put("stream",sendRtpItem.getStreamId());
+                                param.put("ssrc", sendRtpItem.getSsrc());
+                                if (!sendRtpItem.isTcpActive()) {
+                                    param.put("dst_url",sendRtpItem.getIp());
+                                    param.put("dst_port", sendRtpItem.getPort());
+                                }
+                                String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
+                                param.put("is_udp", is_Udp);
+                                param.put("src_port", localPort);
+                                param.put("pt", sendRtpItem.getPt());
+                                param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
+                                param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
+                                if (!sendRtpItem.isTcp()) {
+                                    // 开启rtcp保活
+                                    param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
+                                }
+                                JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpStreamForPassive(mediaInfo, param);
+                                if (startSendRtpStreamResult != null) {
+                                    startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader);
+                                }
+                            }
                         } catch (SipException | InvalidArgumentException | ParseException e) {
                             logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
                         }
@@ -553,6 +582,18 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
         }
     }
 
+    private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
+                                        JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
+        if (jsonObject == null) {
+            logger.error("下级TCP被动启动监听失败: 请检查ZLM服务");
+        } else if (jsonObject.getInteger("code") == 0) {
+            logger.info("调用ZLM-TCP被动推流接口, 结果: {}",  jsonObject);
+            logger.info("启动监听TCP被动推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
+        } else {
+            logger.error("启动监听TCP被动推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param));
+        }
+    }
+
     /**
      * 安排推流
      */

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java

@@ -59,6 +59,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
             // 未注册的设备不做处理
             return;
         }
+        logger.info("[收到心跳], device: {}", device.getDeviceId());
         SIPRequest request = (SIPRequest) evt.getRequest();
         // 回复200 OK
         try {

+ 73 - 25
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -25,8 +25,10 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
+import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
 import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
 import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
@@ -641,7 +643,7 @@ public class ZLMHttpHookListener {
 
         if ("rtp".equals(param.getApp())) {
             String[] s = param.getStream().split("_");
-            if (!mediaInfo.isRtpEnable() || s.length != 2) {
+            if (!mediaInfo.isRtpEnable() || (s.length != 2 && s.length != 4)) {
                 defaultResult.setResult(HookResult.SUCCESS());
                 return defaultResult;
             }
@@ -657,33 +659,79 @@ public class ZLMHttpHookListener {
                 defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg()));
                 return defaultResult;
             }
-            logger.info("[ZLM HOOK] 流未找到, 发起自动点播:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
-
-            RequestMessage msg = new RequestMessage();
-            String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
-            boolean exist = resultHolder.exist(key, null);
-            msg.setKey(key);
-            String uuid = UUID.randomUUID().toString();
-            msg.setId(uuid);
-            DeferredResult<HookResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
-
-            result.onTimeout(() -> {
-                logger.info("[ZLM HOOK] 自动点播, 等待超时");
-                // 释放rtpserver
-                msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "点播超时"));
-                resultHolder.invokeResult(msg);
-            });
-
-            // 录像查询以channelId作为deviceId查询
-            resultHolder.put(key, uuid, result);
-
-            if (!exist) {
-                playService.play(mediaInfo, deviceId, channelId, null, (code, message, data) -> {
-                    msg.setData(new HookResult(code, message));
+            if (s.length == 2) {
+                logger.info("[ZLM HOOK] 预览流未找到, 发起自动点播:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
+
+                RequestMessage msg = new RequestMessage();
+                String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
+                boolean exist = resultHolder.exist(key, null);
+                msg.setKey(key);
+                String uuid = UUID.randomUUID().toString();
+                msg.setId(uuid);
+                DeferredResult<HookResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
+
+                result.onTimeout(() -> {
+                    logger.info("[ZLM HOOK] 预览流自动点播, 等待超时");
+                    // 释放rtpserver
+                    msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "点播超时"));
                     resultHolder.invokeResult(msg);
                 });
+
+                resultHolder.put(key, uuid, result);
+
+                if (!exist) {
+                    playService.play(mediaInfo, deviceId, channelId, null, (code, message, data) -> {
+                        msg.setData(new HookResult(code, message));
+                        resultHolder.invokeResult(msg);
+                    });
+                }
+                return result;
+            }else if(s.length == 4){
+                // 此时为录像回放, 录像回放格式为> 设备ID_通道ID_开始时间_结束时间
+                String startTimeStr = s[2];
+                String endTimeStr = s[3];
+                if (startTimeStr == null || endTimeStr == null || startTimeStr.length() != 14 || endTimeStr.length() != 14) {
+                    defaultResult.setResult(HookResult.SUCCESS());
+                    return defaultResult;
+                }
+                String startTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(startTimeStr);
+                String endTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(endTimeStr);
+                logger.info("[ZLM HOOK] 回放流未找到, 发起自动点播:{}->{}->{}/{}-{}-{}",
+                        param.getMediaServerId(), param.getSchema(),
+                        param.getApp(), param.getStream(),
+                        startTime, endTime
+                );
+                RequestMessage msg = new RequestMessage();
+                String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId;
+                boolean exist = resultHolder.exist(key, null);
+                msg.setKey(key);
+                String uuid = UUID.randomUUID().toString();
+                msg.setId(uuid);
+                DeferredResult<HookResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
+
+                result.onTimeout(() -> {
+                    logger.info("[ZLM HOOK] 回放流自动点播, 等待超时");
+                    // 释放rtpserver
+                    msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "点播超时"));
+                    resultHolder.invokeResult(msg);
+                });
+
+                resultHolder.put(key, uuid, result);
+
+                if (!exist) {
+                    SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaInfo, param.getStream(), null,
+                            device.isSsrcCheck(),  true, 0, false, device.getStreamModeForParam());
+                    playService.playBack(mediaInfo, ssrcInfo, deviceId, channelId, startTime, endTime, (code, message, data) -> {
+                        msg.setData(new HookResult(code, message));
+                        resultHolder.invokeResult(msg);
+                    });
+                }
+                return result;
+            }else {
+                defaultResult.setResult(HookResult.SUCCESS());
+                return defaultResult;
             }
-            return result;
+
         } else {
             // 拉流代理
             StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());

+ 2 - 1
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java

@@ -40,7 +40,6 @@ public interface IPlayService {
 
     void playBack(String deviceId, String channelId, String startTime, String endTime, ErrorCallback<Object> callback);
     void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, ErrorCallback<Object> callback);
-
     void zlmServerOffline(String mediaServerId);
 
     void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback);
@@ -72,4 +71,6 @@ public interface IPlayService {
     void stopTalk(Device device, String channelId, Boolean streamIsReady);
 
     void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback);
+
+
 }

+ 9 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java

@@ -126,7 +126,15 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
         List<DeviceChannel> deviceChannelList = new ArrayList<>();
         if (channelReduces.size() > 0){
             PlatformCatalog catalog = catalogManager.selectByPlatFormAndCatalogId(platform.getServerGBId(),catalogId);
-            if (catalog == null || !catalogId.equals(platform.getDeviceGBId())) {
+            if (catalog == null && catalogId.equals(platform.getDeviceGBId())) {
+                for (ChannelReduce channelReduce : channelReduces) {
+                    DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId());
+                    deviceChannel.setParental(0);
+                    deviceChannel.setCivilCode(platform.getServerGBDomain());
+                    deviceChannelList.add(deviceChannel);
+                }
+                return deviceChannelList;
+            } else if (catalog == null || !catalogId.equals(platform.getDeviceGBId())) {
                 logger.warn("未查询到目录{}的信息", catalogId);
                 return null;
             }

+ 13 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java

@@ -710,7 +710,19 @@ public class PlayServiceImpl implements IPlayService {
             return;
         }
         MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
-        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(),  true, 0, false,false, device.getStreamModeForParam());
+        String stream = null;
+        if (newMediaServerItem.isRtpEnable()) {
+            String startTimeStr = startTime.replace("-", "")
+                    .replace(":", "")
+                    .replace(" ", "");
+            System.out.println(startTimeStr);
+            String endTimeTimeStr = endTime.replace("-", "")
+                    .replace(":", "")
+                    .replace(" ", "");
+            System.out.println(endTimeTimeStr);
+            stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr;
+        }
+        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(),  true, 0, false,false,  device.getStreamModeForParam());
         playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback);
     }
 

+ 5 - 0
src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java

@@ -53,6 +53,10 @@ public class DateUtil {
         return formatter.format(formatterCompatibleISO8601.parse(formatTime));
     }
 
+	public static String urlToyyyy_MM_dd_HH_mm_ss(String formatTime) {
+        return formatter.format(urlFormatter.parse(formatTime));
+    }
+
     /**
      * yyyy_MM_dd_HH_mm_ss 转时间戳
      * @param formatTime
@@ -82,6 +86,7 @@ public class DateUtil {
         return urlFormatter.format(nowDateTime);
     }
 
+
     /**
      * 格式校验
      * @param timeStr 时间字符串