Quellcode durchsuchen

优化推流通知

648540858 vor 1 Jahr
Ursprung
Commit
7d53009987

+ 29 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java

@@ -305,4 +305,33 @@ public class SendRtpItem {
     public void setReceiveStream(String receiveStream) {
         this.receiveStream = receiveStream;
     }
+
+    @Override
+    public String toString() {
+        return "SendRtpItem{" +
+                "ip='" + ip + '\'' +
+                ", port=" + port +
+                ", ssrc='" + ssrc + '\'' +
+                ", platformId='" + platformId + '\'' +
+                ", deviceId='" + deviceId + '\'' +
+                ", app='" + app + '\'' +
+                ", channelId='" + channelId + '\'' +
+                ", status=" + status +
+                ", stream='" + stream + '\'' +
+                ", tcp=" + tcp +
+                ", tcpActive=" + tcpActive +
+                ", localPort=" + localPort +
+                ", mediaServerId='" + mediaServerId + '\'' +
+                ", serverId='" + serverId + '\'' +
+                ", CallId='" + CallId + '\'' +
+                ", fromTag='" + fromTag + '\'' +
+                ", toTag='" + toTag + '\'' +
+                ", pt=" + pt +
+                ", usePs=" + usePs +
+                ", onlyAudio=" + onlyAudio +
+                ", rtcp=" + rtcp +
+                ", playType=" + playType +
+                ", receiveStream='" + receiveStream + '\'' +
+                '}';
+    }
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java

@@ -116,7 +116,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 
 		if (parentPlatform != null) {
 			Map<String, Object> param = getSendRtpParam(sendRtpItem);
-			if (mediaInfo == null) {
+			if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
 				RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
 						sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
 						sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),

+ 14 - 8
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -575,14 +575,20 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                     }
 
                     if ("push".equals(gbStream.getStreamType())) {
-                        if (streamPushItem != null && streamPushItem.isPushIng()) {
-                            // 推流状态
-                            pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
-                        } else {
-                            // 未推流 拉起
-                            notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                        if (streamPushItem != null) {
+                            // 从redis查询是否正在接收这个推流
+                            OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
+                            if (pushListItem != null) {
+                                StreamPushItem transform = streamPushService.transform(pushListItem);
+                                transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
+                                // 推流状态
+                                pushStream(evt, request, gbStream, transform, platform, callIdHeader, mediaServerItem, port, tcpActive,
+                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                            }else {
+                                // 未推流 拉起
+                                notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
+                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                            }
                         }
                     } else if ("proxy".equals(gbStream.getStreamType())) {
                         if (null != proxyByAppAndStream) {

+ 33 - 22
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -509,32 +509,43 @@ public class ZLMHttpHookListener {
                     List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
                     if (!sendRtpItems.isEmpty()) {
                         for (SendRtpItem sendRtpItem : sendRtpItems) {
-                            if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) {
-                                String platformId = sendRtpItem.getPlatformId();
-                                ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
-                                Device device = deviceService.getDevice(platformId);
-
-                                try {
-                                    if (platform != null) {
-                                        commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
-                                        redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
-                                                sendRtpItem.getCallId(), sendRtpItem.getStream());
-                                    } else {
-                                        cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
-                                        if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
-                                                || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
-                                            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-                                            if (audioBroadcastCatch != null) {
-                                                // 来自上级平台的停止对讲
-                                                logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-                                                audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                            if (sendRtpItem == null) {
+                                continue;
+                            }
+
+                            if (sendRtpItem.getApp().equals(param.getApp())) {
+                                logger.info(sendRtpItem.toString());
+                                if (userSetting.getServerId().equals(sendRtpItem.getServerId())) {
+                                    // 通知其他wvp停止发流
+//                                    redisCatchStorage.sendRtp
+                                }else {
+                                    String platformId = sendRtpItem.getPlatformId();
+                                    ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
+                                    Device device = deviceService.getDevice(platformId);
+
+                                    try {
+                                        if (platform != null) {
+                                            commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
+                                            redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
+                                                    sendRtpItem.getCallId(), sendRtpItem.getStream());
+                                        } else {
+                                            cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
+                                            if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
+                                                    || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
+                                                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                if (audioBroadcastCatch != null) {
+                                                    // 来自上级平台的停止对讲
+                                                    logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                    audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                }
                                             }
                                         }
+                                    } catch (SipException | InvalidArgumentException | ParseException |
+                                             SsrcTransactionNotFoundException e) {
+                                        logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
                                     }
-                                } catch (SipException | InvalidArgumentException | ParseException |
-                                         SsrcTransactionNotFoundException e) {
-                                    logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
                                 }
+
                             }
                         }
                     }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java

@@ -1170,7 +1170,7 @@ public class PlayServiceImpl implements IPlayService {
             dynamicTask.startDelay(key, ()->{
                 logger.info("[语音广播]等待invite消息超时:{}/{}", device.getDeviceId(), channelId);
                 stopAudioBroadcast(device.getDeviceId(), channelId);
-            }, 2000);
+            }, 10*1000);
         }, eventResultForError -> {
             // 发送失败
             logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg);

+ 0 - 4
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java

@@ -8,7 +8,6 @@ import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
-import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
@@ -25,7 +24,6 @@ import com.genersoft.iot.vmp.service.IStreamProxyService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
 import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
 import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -333,8 +331,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
             result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
                     param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
         }
-        System.out.println("addStreamProxyToZlm====");
-        System.out.println(result);
         if (result != null && result.getInteger("code") == 0) {
             JSONObject data = result.getJSONObject("data");
             if (data == null) {

+ 47 - 51
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java

@@ -1,11 +1,7 @@
 package com.genersoft.iot.vmp.service.redisMsg;
 
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.conf.UserSetting;
-
 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -41,52 +37,52 @@ public class RedisStreamMsgListener implements MessageListener {
 
     @Override
     public void onMessage(Message message, byte[] bytes) {
-        boolean isEmpty = taskQueue.isEmpty();
-        taskQueue.offer(message);
-        if (isEmpty) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    Message msg = taskQueue.poll();
-                    try {
-                        JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
-                        if (steamMsgJson == null) {
-                            logger.warn("[收到redis 流变化]消息解析失败");
-                            continue;
-                        }
-                        String serverId = steamMsgJson.getString("serverId");
-
-                        if (userSetting.getServerId().equals(serverId)) {
-                            // 自己发送的消息忽略即可
-                            continue;
-                        }
-                        logger.info("[收到redis 流变化]: {}", new String(message.getBody()));
-                        String app = steamMsgJson.getString("app");
-                        String stream = steamMsgJson.getString("stream");
-                        boolean register = steamMsgJson.getBoolean("register");
-                        String mediaServerId = steamMsgJson.getString("mediaServerId");
-                        OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
-                        onStreamChangedHookParam.setSeverId(serverId);
-                        onStreamChangedHookParam.setApp(app);
-                        onStreamChangedHookParam.setStream(stream);
-                        onStreamChangedHookParam.setRegist(register);
-                        onStreamChangedHookParam.setMediaServerId(mediaServerId);
-                        onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
-                        onStreamChangedHookParam.setAliveSecond(0L);
-                        onStreamChangedHookParam.setTotalReaderCount("0");
-                        onStreamChangedHookParam.setOriginType(0);
-                        onStreamChangedHookParam.setOriginTypeStr("0");
-                        onStreamChangedHookParam.setOriginTypeStr("unknown");
-                        if (register) {
-                            zlmMediaListManager.addPush(onStreamChangedHookParam);
-                        }else {
-                            zlmMediaListManager.removeMedia(app, stream);
-                        }
-                    }catch (Exception e) {
-                        logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
-                        logger.error("[REDIS消息-流变化] 异常内容: ", e);
-                    }
-                }
-            });
-        }
+//        boolean isEmpty = taskQueue.isEmpty();
+//        taskQueue.offer(message);
+//        if (isEmpty) {
+//            taskExecutor.execute(() -> {
+//                while (!taskQueue.isEmpty()) {
+//                    Message msg = taskQueue.poll();
+//                    try {
+//                        JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
+//                        if (steamMsgJson == null) {
+//                            logger.warn("[收到redis 流变化]消息解析失败");
+//                            continue;
+//                        }
+//                        String serverId = steamMsgJson.getString("serverId");
+//
+//                        if (userSetting.getServerId().equals(serverId)) {
+//                            // 自己发送的消息忽略即可
+//                            continue;
+//                        }
+//                        logger.info("[收到redis 流变化]: {}", new String(message.getBody()));
+//                        String app = steamMsgJson.getString("app");
+//                        String stream = steamMsgJson.getString("stream");
+//                        boolean register = steamMsgJson.getBoolean("register");
+//                        String mediaServerId = steamMsgJson.getString("mediaServerId");
+//                        OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
+//                        onStreamChangedHookParam.setSeverId(serverId);
+//                        onStreamChangedHookParam.setApp(app);
+//                        onStreamChangedHookParam.setStream(stream);
+//                        onStreamChangedHookParam.setRegist(register);
+//                        onStreamChangedHookParam.setMediaServerId(mediaServerId);
+//                        onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
+//                        onStreamChangedHookParam.setAliveSecond(0L);
+//                        onStreamChangedHookParam.setTotalReaderCount("0");
+//                        onStreamChangedHookParam.setOriginType(0);
+//                        onStreamChangedHookParam.setOriginTypeStr("0");
+//                        onStreamChangedHookParam.setOriginTypeStr("unknown");
+//                        if (register) {
+//                            zlmMediaListManager.addPush(onStreamChangedHookParam);
+//                        }else {
+//                            zlmMediaListManager.removeMedia(app, stream);
+//                        }
+//                    }catch (Exception e) {
+//                        logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
+//                        logger.error("[REDIS消息-流变化] 异常内容: ", e);
+//                    }
+//                }
+//            });
+//        }
     }
 }

+ 2 - 0
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java

@@ -211,5 +211,7 @@ public interface IRedisCatchStorage {
 
     void addPushListItem(String app, String stream, OnStreamChangedHookParam param);
 
+    OnStreamChangedHookParam getPushListItem(String app, String stream);
+
     void removePushListItem(String app, String stream, String mediaServerId);
 }

+ 6 - 0
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -656,6 +656,12 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
         redisTemplate.opsForValue().set(key, param);
     }
 
+    @Override
+    public OnStreamChangedHookParam getPushListItem(String app, String stream) {
+        String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream;
+        return (OnStreamChangedHookParam)redisTemplate.opsForValue().get(key);
+    }
+
     @Override
     public void removePushListItem(String app, String stream, String mediaServerId) {
         String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream;

+ 0 - 5
src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java

@@ -1,12 +1,8 @@
 package com.genersoft.iot.vmp.vmanager.cloudRecord;
 
 import com.alibaba.fastjson2.JSONArray;
-import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.conf.security.JwtUtils;
-import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.ICloudRecordService;
 import com.genersoft.iot.vmp.service.IMediaServerService;
@@ -22,7 +18,6 @@ import org.apache.commons.lang3.ObjectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.web.bind.annotation.*;
 
 import javax.servlet.http.HttpServletRequest;