Explorar o código

Merge branch 'master' into dev/录制计划

648540858 hai 11 meses
pai
achega
d3f53db160

+ 6 - 3
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java

@@ -175,15 +175,18 @@ public class SendRtpInfo {
         return sendRtpItem;
     }
 
-    public static SendRtpInfo getInstance(Integer localPort, MediaServer mediaServer, String ip, int port, String ssrc,
-                                          String deviceId, String platformId, Integer channelId, boolean isTcp, boolean rtcp,
+    public static SendRtpInfo getInstance(Integer localPort, MediaServer mediaServer, String ip, Integer port, String ssrc,
+                                          String deviceId, String platformId, Integer channelId, Boolean isTcp, Boolean rtcp,
                                           String serverId) {
         if (localPort == 0) {
             return null;
         }
         SendRtpInfo sendRtpItem = new SendRtpInfo();
         sendRtpItem.setIp(ip);
-        sendRtpItem.setPort(port);
+        if(port != null) {
+            sendRtpItem.setPort(port);
+        }
+
         sendRtpItem.setSsrc(ssrc);
         if (deviceId != null) {
             sendRtpItem.setTargetId(deviceId);

+ 5 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SyncStatus.java

@@ -3,6 +3,8 @@ package com.genersoft.iot.vmp.gb28181.bean;
 import io.swagger.v3.oas.annotations.media.Schema;
 import lombok.Data;
 
+import java.time.Instant;
+
 /**
  * 摄像机同步状态
  * @author lin
@@ -23,4 +25,7 @@ public class SyncStatus {
     @Schema(description = "是否同步中")
     private Boolean syncIng;
 
+    @Schema(description = "时间")
+    private Instant time;
+
 }

+ 26 - 13
src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java

@@ -144,9 +144,21 @@ public class DeviceQuery {
 		Device device = deviceService.getDeviceByDeviceId(deviceId);
 		boolean status = deviceService.isSyncRunning(deviceId);
 		// 已存在则返回进度
-		if (status) {
+		if (deviceService.isSyncRunning(deviceId)) {
 			SyncStatus channelSyncStatus = deviceService.getChannelSyncStatus(deviceId);
-			return WVPResult.success(channelSyncStatus);
+			WVPResult wvpResult = new WVPResult();
+			if (channelSyncStatus.getErrorMsg() != null) {
+				wvpResult.setCode(ErrorCode.ERROR100.getCode());
+				wvpResult.setMsg(channelSyncStatus.getErrorMsg());
+			}else if (channelSyncStatus.getTotal() == null || channelSyncStatus.getTotal() == 0){
+				wvpResult.setCode(ErrorCode.SUCCESS.getCode());
+				wvpResult.setMsg("等待通道信息...");
+			}else {
+				wvpResult.setCode(ErrorCode.SUCCESS.getCode());
+				wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
+				wvpResult.setData(channelSyncStatus);
+			}
+			return wvpResult;
 		}
 		deviceService.sync(device);
 
@@ -413,18 +425,19 @@ public class DeviceQuery {
 	public WVPResult<SyncStatus> getSyncStatus(@PathVariable String deviceId) {
 		SyncStatus channelSyncStatus = deviceService.getChannelSyncStatus(deviceId);
 		WVPResult<SyncStatus> wvpResult = new WVPResult<>();
-		if (channelSyncStatus == null || channelSyncStatus.getTotal() == null) {
-			wvpResult.setCode(0);
-			wvpResult.setMsg("同步尚未开始");
+		if (channelSyncStatus == null) {
+			wvpResult.setCode(ErrorCode.ERROR100.getCode());
+			wvpResult.setMsg("同步不存在");
+		}else if (channelSyncStatus.getErrorMsg() != null) {
+			wvpResult.setCode(ErrorCode.ERROR100.getCode());
+			wvpResult.setMsg(channelSyncStatus.getErrorMsg());
+		}else if (channelSyncStatus.getTotal() == null || channelSyncStatus.getTotal() == 0){
+			wvpResult.setCode(ErrorCode.SUCCESS.getCode());
+			wvpResult.setMsg("等待通道信息...");
 		}else {
-			if (channelSyncStatus.getErrorMsg() == null) {
-				wvpResult.setCode(ErrorCode.SUCCESS.getCode());
-				wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
-				wvpResult.setData(channelSyncStatus);
-			}else {
-				wvpResult.setCode(ErrorCode.ERROR100.getCode());
-				wvpResult.setMsg(channelSyncStatus.getErrorMsg());
-			}
+			wvpResult.setCode(ErrorCode.SUCCESS.getCode());
+			wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
+			wvpResult.setData(channelSyncStatus);
 		}
 		return wvpResult;
 	}

+ 3 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java

@@ -1,5 +1,6 @@
 package com.genersoft.iot.vmp.gb28181.service.impl;
 
+import com.alibaba.fastjson2.JSON;
 import com.baomidou.dynamic.datasource.annotation.DS;
 import com.genersoft.iot.vmp.common.CommonCallback;
 import com.genersoft.iot.vmp.common.VideoManagerConstants;
@@ -322,7 +323,8 @@ public class DeviceServiceImpl implements IDeviceService {
     @Override
     public void sync(Device device) {
         if (catalogResponseMessageHandler.isSyncRunning(device.getDeviceId())) {
-            log.info("开启同步时发现同步已经存在");
+            SyncStatus syncStatus = catalogResponseMessageHandler.getChannelSyncProgress(device.getDeviceId());
+            log.info("[同步通道] 同步已存在, 设备: {}, 同步信息: {}", device.getDeviceId(), JSON.toJSON(syncStatus));
             return;
         }
         int sn = (int)((Math.random()*9+1)*100000);

+ 8 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java

@@ -517,7 +517,14 @@ public class PlayServiceImpl implements IPlayService {
         }, userSetting.getPlayTimeout());
 
         try {
-            mediaServerService.startSendRtpPassive(mediaServerItem, sendRtpInfo, userSetting.getPlayTimeout() * 1000);
+            Integer localPort = mediaServerService.startSendRtpPassive(mediaServerItem, sendRtpInfo, userSetting.getPlayTimeout() * 1000);
+            if (localPort == null || localPort <= 0) {
+                timeoutCallback.run();
+                mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc());
+                sessionManager.removeByStream(sendRtpInfo.getStream());
+                return;
+            }
+            sendRtpInfo.setPort(localPort);
         }catch (ControllerException e) {
             mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc());
             log.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channel.getDeviceId());

+ 7 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java

@@ -170,11 +170,16 @@ public class CatalogDataManager implements CommandLineRunner {
                 syncStatus.setCurrent(catalogData.getRedisKeysForChannel().size());
                 syncStatus.setTotal(catalogData.getTotal());
                 syncStatus.setErrorMsg(catalogData.getErrorMsg());
+                syncStatus.setTime(catalogData.getTime());
                 if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready) || catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) {
                     syncStatus.setSyncIng(false);
                 }else {
                     syncStatus.setSyncIng(true);
                 }
+                if (catalogData.getErrorMsg() != null) {
+                    // 失败的同步信息,返回一次后直接移除
+                    dataMap.remove(key);
+                }
                 return syncStatus;
             }
         }
@@ -237,7 +242,8 @@ public class CatalogDataManager implements CommandLineRunner {
                     catalogData.setErrorMsg(errorMsg);
                 }
             }
-            if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getTime().isBefore(instantBefore30S)) { // 超过三十秒,如果标记为end则删除
+            if ((catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) || catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready))
+                    && catalogData.getTime().isBefore(instantBefore30S)) { // 超过三十秒,如果标记为end则删除
                 dataMap.remove(dataKey);
                 Set<String> redisKeysForChannel = catalogData.getRedisKeysForChannel();
                 if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) {

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -303,7 +303,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
             Media media = mediaDescription.getMedia();
 
             Vector mediaFormats = media.getMediaFormats(false);
-            if (mediaFormats.contains("96")) {
+            if (mediaFormats.contains("96") || mediaFormats.contains("8")) {
                 port = media.getMediaPort();
                 //String mediaType = media.getMediaType();
                 String protocol = media.getProtocol();

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java

@@ -58,7 +58,7 @@ public interface IMediaNodeServerService {
 
     Map<String, String> getFFmpegCMDs(MediaServer mediaServer);
 
-    void startSendRtpPassive(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout);
+    Integer startSendRtpPassive(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout);
 
     void startSendRtpStream(MediaServer mediaServer, SendRtpInfo sendRtpItem);
 

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java

@@ -142,7 +142,7 @@ public interface IMediaServerService {
 
     Boolean isStreamReady(MediaServer mediaServer, String rtp, String streamId);
 
-    void startSendRtpPassive(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout);
+    Integer startSendRtpPassive(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout);
 
     void startSendRtp(MediaServer mediaServer, SendRtpInfo sendRtpItem);
 

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java

@@ -867,13 +867,13 @@ public class MediaServerServiceImpl implements IMediaServerService {
     }
 
     @Override
-    public void startSendRtpPassive(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout) {
+    public Integer startSendRtpPassive(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout) {
         IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
         if (mediaNodeServerService == null) {
             log.info("[startSendRtpPassive] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
             throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类");
         }
-        mediaNodeServerService.startSendRtpPassive(mediaServer, sendRtpItem, timeout);
+        return mediaNodeServerService.startSendRtpPassive(mediaServer, sendRtpItem, timeout);
     }
 
     @Override

+ 2 - 1
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java

@@ -329,7 +329,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
     }
 
     @Override
-    public void startSendRtpPassive(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout) {
+    public Integer startSendRtpPassive(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout) {
         Map<String, Object> param = new HashMap<>(12);
         param.put("vhost","__defaultVhost__");
         param.put("app", sendRtpItem.getApp());
@@ -361,6 +361,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
         log.info("调用ZLM-TCP被动推流接口, 结果: {}",  jsonObject);
         log.info("启动监听TCP被动推流成功[ {}/{} ],{}->{}:{}, " , sendRtpItem.getApp(), sendRtpItem.getStream(),
                 jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
+        return jsonObject.getInteger("local_port");
     }
 
     @Override

+ 3 - 9
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookParam.java

@@ -1,19 +1,13 @@
 package com.genersoft.iot.vmp.media.zlm.dto.hook;
 
+import lombok.Data;
+
 /**
  * zlm hook事件的参数
  * @author lin
  */
+@Data
 public class HookParam {
     private String mediaServerId;
 
-
-
-    public String getMediaServerId() {
-        return mediaServerId;
-    }
-
-    public void setMediaServerId(String mediaServerId) {
-        this.mediaServerId = mediaServerId;
-    }
 }

+ 28 - 64
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnPublishHookParam.java

@@ -1,83 +1,47 @@
 package com.genersoft.iot.vmp.media.zlm.dto.hook;
 
+import lombok.Getter;
+import lombok.Setter;
+
 /**
  * zlm hook事件中的on_publish事件的参数
  * @author lin
  */
-public class OnPublishHookParam extends HookParam{
-    private String id;
-    private String app;
-    private String stream;
-    private String ip;
-    private String params;
-    private int port;
-    private String schema;
-    private String vhost;
-
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public String getApp() {
-        return app;
-    }
-
-    public void setApp(String app) {
-        this.app = app;
-    }
-
-    public String getStream() {
-        return stream;
-    }
 
-    public void setStream(String stream) {
-        this.stream = stream;
-    }
-
-    public String getIp() {
-        return ip;
-    }
+public class OnPublishHookParam extends HookParam{
 
-    public void setIp(String ip) {
-        this.ip = ip;
-    }
+    @Getter
+    @Setter
+    private String id;
 
-    public String getParams() {
-        return params;
-    }
+    @Getter
+    @Setter
+    private String app;
 
-    public void setParams(String params) {
-        this.params = params;
-    }
+    @Getter
+    @Setter
+    private String stream;
 
-    public int getPort() {
-        return port;
-    }
+    @Getter
+    @Setter
+    private String ip;
 
-    public void setPort(int port) {
-        this.port = port;
-    }
+    @Getter
+    @Setter
+    private String params;
 
-    public String getSchema() {
-        return schema;
-    }
+    @Getter
+    @Setter
+    private int port;
 
-    public void setSchema(String schema) {
-        this.schema = schema;
-    }
+    @Getter
+    @Setter
+    private String schema;
 
-    public String getVhost() {
-        return vhost;
-    }
+    @Getter
+    @Setter
+    private String vhost;
 
-    public void setVhost(String vhost) {
-        this.vhost = vhost;
-    }
 
     @Override
     public String toString() {

+ 6 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java

@@ -78,6 +78,12 @@ public class MediaServiceImpl implements IMediaService {
     public ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params) {
         // 推流鉴权的处理
         if (!"rtp".equals(app)) {
+            if ("talk".equals(app) && stream.endsWith("_talk")) {
+                ResultForOnPublish result = new ResultForOnPublish();
+                result.setEnable_mp4(false);
+                result.setEnable_audio(true);
+                return result;
+            }
             StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream);
             if (streamProxyItem != null) {
                 ResultForOnPublish result = new ResultForOnPublish();

+ 2 - 4
web_src/src/components/dialog/SyncChannelProgress.vue

@@ -60,9 +60,6 @@ export default {
         url:`/api/device/query/${this.deviceId}/sync_status/`,
       }).then((res) => {
         if (res.data.code === 0) {
-          if (!this.syncFlag) {
-            this.syncFlag = true;
-          }
 
           if (res.data.data != null) {
             if (res.data.data.syncIng) {
@@ -70,6 +67,7 @@ export default {
                 this.msg = `等待同步中`;
                 this.timmer = setTimeout(this.getProgress, 300)
               }else {
+                this.syncFlag = true;
                 this.total = res.data.data.total;
                 this.current = res.data.data.current;
                 this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100;
@@ -90,7 +88,7 @@ export default {
               }
             }
           }else {
-            this.msg = `同步尚未开始`;
+            this.msg = res.data.msg;
             this.timmer = setTimeout(this.getProgress, 300)
           }
         }else {