Explorar o código

修复 SiP消息超时未未回复无法识别的BUG

648540858 hai 1 ano
pai
achega
2492b0d638

+ 5 - 78
src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java

@@ -1,13 +1,15 @@
 package com.genersoft.iot.vmp.conf;
 
 
-import org.springframework.core.annotation.Order;
+import lombok.Data;
 import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
 
 @Component
 @ConfigurationProperties(prefix = "sip", ignoreInvalidFields = true)
 @Order(0)
+@Data
 public class SipConfig {
 
 	private String ip;
@@ -26,82 +28,7 @@ public class SipConfig {
 
 	Integer registerTimeInterval = 120;
 
-	private boolean alarm;
-
-	public void setIp(String ip) {
-		this.ip = ip;
-	}
-
-	public void setPort(Integer port) {
-		this.port = port;
-	}
-
-	public void setDomain(String domain) {
-		this.domain = domain;
-	}
-
-	public void setId(String id) {
-		this.id = id;
-	}
-
-	public void setPassword(String password) {
-		this.password = password;
-	}
-
-	public void setPtzSpeed(Integer ptzSpeed) {
-		this.ptzSpeed = ptzSpeed;
-	}
-
-
-	public void setRegisterTimeInterval(Integer registerTimeInterval) {
-		this.registerTimeInterval = registerTimeInterval;
-	}
-
-	public String getIp() {
-		return ip;
-	}
-
-
-	public Integer getPort() {
-		return port;
-	}
-
-
-	public String getDomain() {
-		return domain;
-	}
-
-
-	public String getId() {
-		return id;
-	}
-
-	public String getPassword() {
-		return password;
-	}
-
-
-	public Integer getPtzSpeed() {
-		return ptzSpeed;
-	}
-
-	public Integer getRegisterTimeInterval() {
-		return registerTimeInterval;
-	}
-
-	public boolean isAlarm() {
-		return alarm;
-	}
-
-	public void setAlarm(boolean alarm) {
-		this.alarm = alarm;
-	}
-
-	public String getShowIp() {
-		return showIp;
-	}
+	private boolean alarm = false;
 
-	public void setShowIp(String showIp) {
-		this.showIp = showIp;
-	}
+	private long timeout = 15;
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java

@@ -125,7 +125,7 @@ public class SipLayer implements CommandLineRunner {
 
 			SipProviderImpl udpSipProvider = (SipProviderImpl)sipStack.createSipProvider(udpListeningPoint);
 			udpSipProvider.addSipListener(sipProcessorObserver);
-
+			udpSipProvider.setDialogErrorsAutomaticallyHandled();
 			udpSipProviderMap.put(monitorIp, udpSipProvider);
 
 			log.info("[SIP SERVER] udp://{}:{} 启动成功", monitorIp, port);

+ 44 - 59
src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java

@@ -1,6 +1,7 @@
 package com.genersoft.iot.vmp.gb28181.event;
 
 import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent;
+import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent;
 import gov.nist.javax.sip.message.SIPRequest;
 import gov.nist.javax.sip.message.SIPResponse;
 import lombok.extern.slf4j.Slf4j;
@@ -13,10 +14,9 @@ import javax.sip.ResponseEvent;
 import javax.sip.TimeoutEvent;
 import javax.sip.TransactionTerminatedEvent;
 import javax.sip.header.WarningHeader;
-import java.time.Instant;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.DelayQueue;
 
 /**
  * @author lin
@@ -25,41 +25,36 @@ import java.util.concurrent.TimeUnit;
 @Component
 public class SipSubscribe {
 
-    private final Map<String, SipSubscribe.Event> errorSubscribes = new ConcurrentHashMap<>();
+    private final Map<String, SipEvent> subscribes = new ConcurrentHashMap<>();
 
-    private final Map<String, SipSubscribe.Event> okSubscribes = new ConcurrentHashMap<>();
+    private final DelayQueue<SipEvent> delayQueue = new DelayQueue<>();
 
-    private final Map<String, Instant> okTimeSubscribes = new ConcurrentHashMap<>();
-
-    private final Map<String, Instant> errorTimeSubscribes = new ConcurrentHashMap<>();
-
-    //    @Scheduled(cron="*/5 * * * * ?")   //每五秒执行一次
-    //    @Scheduled(fixedRate= 100 * 60 * 60 )
-    @Scheduled(cron="0 0/5 * * * ?")   //每5分钟执行一次
+    @Scheduled(fixedRate = 200)   //每200毫秒执行
     public void execute(){
-        if(log.isDebugEnabled()){
-            log.info("[定时任务] 清理过期的SIP订阅信息");
-        }
-
-        Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5));
-
-        for (String key : okTimeSubscribes.keySet()) {
-            if (okTimeSubscribes.get(key).isBefore(instant)){
-                okSubscribes.remove(key);
-                okTimeSubscribes.remove(key);
-            }
+        if (delayQueue.isEmpty()) {
+            return;
         }
-        for (String key : errorTimeSubscribes.keySet()) {
-            if (errorTimeSubscribes.get(key).isBefore(instant)){
-                errorSubscribes.remove(key);
-                errorTimeSubscribes.remove(key);
+        try {
+            SipEvent take = delayQueue.take();
+            // 出现超时异常
+            if(take.getErrorEvent() != null) {
+                EventResult<Object> eventResult = new EventResult<>();
+                eventResult.type = EventResultType.timeout;
+                eventResult.msg = "消息超时未回复";
+                eventResult.statusCode = -1024;
+                take.getErrorEvent().response(eventResult);
             }
+            subscribes.remove(take.getKey());
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
         }
-        if(log.isDebugEnabled()){
-            log.debug("okTimeSubscribes.size:{}",okTimeSubscribes.size());
-            log.debug("okSubscribes.size:{}",okSubscribes.size());
-            log.debug("errorTimeSubscribes.size:{}",errorTimeSubscribes.size());
-            log.debug("errorSubscribes.size:{}",errorSubscribes.size());
+    }
+
+    public void updateTimeout(String callId) {
+        SipEvent sipEvent = subscribes.get(callId);
+        if (sipEvent != null) {
+            delayQueue.remove(sipEvent);
+            delayQueue.offer(sipEvent);
         }
     }
 
@@ -156,43 +151,33 @@ public class SipSubscribe {
         }
     }
 
-    public void addErrorSubscribe(String key, SipSubscribe.Event event) {
-        errorSubscribes.put(key, event);
-        errorTimeSubscribes.put(key, Instant.now());
-    }
 
-    public void addOkSubscribe(String key, SipSubscribe.Event event) {
-        okSubscribes.put(key, event);
-        okTimeSubscribes.put(key, Instant.now());
-    }
-
-    public SipSubscribe.Event getErrorSubscribe(String key) {
-        return errorSubscribes.get(key);
-    }
-
-    public void removeErrorSubscribe(String key) {
-        if(key == null){
-            return;
+    public void addSubscribe(String key, SipEvent event) {
+        SipEvent sipEvent = subscribes.get(key);
+        if (sipEvent != null) {
+            subscribes.remove(key);
+            delayQueue.remove(sipEvent);
         }
-        errorSubscribes.remove(key);
-        errorTimeSubscribes.remove(key);
+        subscribes.put(key, event);
+        delayQueue.offer(event);
     }
 
-    public SipSubscribe.Event getOkSubscribe(String key) {
-        return okSubscribes.get(key);
+    public SipEvent getSubscribe(String key) {
+        return subscribes.get(key);
     }
 
-    public void removeOkSubscribe(String key) {
+    public void removeSubscribe(String key) {
         if(key == null){
             return;
         }
-        okSubscribes.remove(key);
-        okTimeSubscribes.remove(key);
-    }
-    public int getErrorSubscribesSize(){
-        return errorSubscribes.size();
+        SipEvent sipEvent = subscribes.get(key);
+        if (sipEvent != null) {
+            subscribes.remove(key);
+            delayQueue.remove(sipEvent);
+        }
     }
-    public int getOkSubscribesSize(){
-        return okSubscribes.size();
+
+    public boolean isEmpty(){
+        return subscribes.isEmpty();
     }
 }

+ 48 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/SipEvent.java

@@ -0,0 +1,48 @@
+package com.genersoft.iot.vmp.gb28181.event.sip;
+
+import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
+import lombok.Data;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+@Data
+public class SipEvent implements Delayed {
+
+    private String key;
+
+    /**
+     * 成功的回调
+     */
+    private SipSubscribe.Event okEvent;
+
+    /**
+     * 错误的回调,包括超时
+     */
+    private SipSubscribe.Event errorEvent;
+
+    /**
+     * 超时时间
+     */
+    private long delay;
+
+    public static SipEvent getInstance(String key, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, long delay) {
+        SipEvent sipEvent = new SipEvent();
+        sipEvent.setKey(key);
+        sipEvent.setOkEvent(okEvent);
+        sipEvent.setErrorEvent(errorEvent);
+        sipEvent.setDelay(delay);
+        return sipEvent;
+    }
+
+    @Override
+    public long getDelay(@NotNull TimeUnit unit) {
+        return unit.convert(delay - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public int compareTo(@NotNull Delayed o) {
+        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
+    }
+}

+ 3 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java

@@ -380,7 +380,9 @@ public class PlatformServiceImpl implements IPlatformService {
             commanderForPlatform.register(platform, sipTransactionInfo,  eventResult -> {
                 log.info("[国标级联] 平台:{}注册失败,{}:{}", platform.getServerGBId(),
                         eventResult.statusCode, eventResult.msg);
-                offline(platform, false);
+                if (platform.isStatus()) {
+                    offline(platform, false);
+                }
             }, null);
         } catch (Exception e) {
             log.error("[命令发送失败] 国标级联定时注册: {}", e.getMessage());

+ 5 - 5
src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java

@@ -436,7 +436,7 @@ public class PlayServiceImpl implements IPlayService {
                 // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
                 InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel, callback, inviteInfo, InviteSessionType.PLAY);
             }, (event) -> {
-                log.info("[点播失败] deviceId: {}, channelId:{}, {}: {}", device.getDeviceId(), channel.getDeviceId(), event.statusCode, event.msg);
+                log.info("[点播失败]{}:{} deviceId: {}, channelId:{}",event.statusCode, event.msg, device.getDeviceId(), channel.getDeviceId());
                 receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
 
                 sessionManager.removeByStream(ssrcInfo.getStream());
@@ -447,7 +447,7 @@ public class PlayServiceImpl implements IPlayService {
                         event.statusCode, event.msg, null);
 
                 inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId());
-            });
+            }, userSetting.getPlayTimeout().longValue());
         } catch (InvalidArgumentException | SipException | ParseException e) {
             log.error("[命令发送失败] 点播消息: {}", e.getMessage());
             receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
@@ -565,7 +565,7 @@ public class PlayServiceImpl implements IPlayService {
                 mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc());
                 sessionManager.removeByStream(sendRtpInfo.getStream());
                 errorEvent.response(event);
-            });
+            }, userSetting.getPlayTimeout().longValue());
         } catch (InvalidArgumentException | SipException | ParseException e) {
 
             log.error("[命令发送失败] 对讲消息: {}", e.getMessage());
@@ -820,7 +820,7 @@ public class PlayServiceImpl implements IPlayService {
                         receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
                         sessionManager.removeByStream(ssrcInfo.getStream());
                         inviteStreamService.removeInviteInfo(inviteInfo);
-                    });
+                    }, userSetting.getPlayTimeout().longValue());
         } catch (InvalidArgumentException | SipException | ParseException e) {
             log.error("[命令发送失败] 录像回放: {}", e.getMessage());
             if (callback != null) {
@@ -1044,7 +1044,7 @@ public class PlayServiceImpl implements IPlayService {
                         // 设置过期时间,下载失败时自动处理订阅数据
                         hook.setExpireTime(System.currentTimeMillis() + 24 * 60 * 60 * 1000);
                         subscribe.addSubscribe(hook, hookEventForRecord);
-                    });
+                    }, userSetting.getPlayTimeout().longValue());
         } catch (InvalidArgumentException | SipException | ParseException e) {
             log.error("[命令发送失败] 录像下载: {}", e.getMessage());
             callback.run(InviteErrorCode.FAIL.getCode(),e.getMessage(), null);

+ 44 - 42
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java

@@ -2,18 +2,18 @@ package com.genersoft.iot.vmp.gb28181.transmit;
 
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
+import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
 import com.genersoft.iot.vmp.gb28181.transmit.event.response.ISIPResponseProcessor;
 import com.genersoft.iot.vmp.gb28181.transmit.event.timeout.ITimeoutProcessor;
+import gov.nist.javax.sip.message.SIPResponse;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Component;
 
 import javax.sip.*;
-import javax.sip.header.CSeqHeader;
 import javax.sip.header.CallIdHeader;
-import javax.sip.message.Request;
 import javax.sip.message.Response;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -88,40 +88,40 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
     @Override
     @Async("taskExecutor")
     public void processResponse(ResponseEvent responseEvent) {
-        Response response = responseEvent.getResponse();
+        SIPResponse response = (SIPResponse)responseEvent.getResponse();
         int status = response.getStatusCode();
 
         // Success
         if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) {
-            CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
-            String method = cseqHeader.getMethod();
-            ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
-            if (sipRequestProcessor != null) {
-                sipRequestProcessor.process(responseEvent);
-            }
-            if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
-                CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
+            if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && !sipSubscribe.isEmpty() ) {
+                CallIdHeader callIdHeader = response.getCallIdHeader();
                 if (callIdHeader != null) {
-                    SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
-                    if (subscribe != null) {
-                        SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
-                        sipSubscribe.removeOkSubscribe(callIdHeader.getCallId());
-                        subscribe.response(eventResult);
+                    SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId());
+                    if (sipEvent != null && sipEvent.getOkEvent() != null) {
+                        SipSubscribe.EventResult<ResponseEvent> eventResult = new SipSubscribe.EventResult<>(responseEvent);
+                        sipSubscribe.removeSubscribe(callIdHeader.getCallId());
+                        sipEvent.getOkEvent().response(eventResult);
                     }
                 }
             }
+            ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(response.getCSeqHeader().getMethod());
+            if (sipRequestProcessor != null) {
+                sipRequestProcessor.process(responseEvent);
+            }
         } else if ((status >= Response.TRYING) && (status < Response.OK)) {
             // 增加其它无需回复的响应,如101、180等
+            // 更新sip订阅的时间
+//            sipSubscribe.updateTimeout(response.getCallIdHeader().getCallId());
         } else {
             log.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase());
-            if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
+            if (responseEvent.getResponse() != null && !sipSubscribe.isEmpty() ) {
                 CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
                 if (callIdHeader != null) {
-                    SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
-                    if (subscribe != null) {
+                    SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId());
+                    if (sipEvent != null && sipEvent.getErrorEvent() != null) {
                         SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
-                        subscribe.response(eventResult);
-                        sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId());
+                        sipSubscribe.removeSubscribe(callIdHeader.getCallId());
+                        sipEvent.getErrorEvent().response(eventResult);
                     }
                 }
             }
@@ -140,27 +140,27 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
     @Override
     public void processTimeout(TimeoutEvent timeoutEvent) {
         log.info("[消息发送超时]");
-        ClientTransaction clientTransaction = timeoutEvent.getClientTransaction();
-
-        if (clientTransaction != null) {
-            log.info("[发送错误订阅] clientTransaction != null");
-            Request request = clientTransaction.getRequest();
-            if (request != null) {
-                log.info("[发送错误订阅] request != null");
-                CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
-                if (callIdHeader != null) {
-                    log.info("[发送错误订阅]");
-                    SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
-                    SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(timeoutEvent);
-                    if (subscribe != null){
-                        subscribe.response(eventResult);
-                    }
-                    sipSubscribe.removeOkSubscribe(callIdHeader.getCallId());
-                    sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId());
-                }
-            }
-        }
-        eventPublisher.requestTimeOut(timeoutEvent);
+//        ClientTransaction clientTransaction = timeoutEvent.getClientTransaction();
+//
+//        if (clientTransaction != null) {
+//            log.info("[发送错误订阅] clientTransaction != null");
+//            Request request = clientTransaction.getRequest();
+//            if (request != null) {
+//                log.info("[发送错误订阅] request != null");
+//                CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
+//                if (callIdHeader != null) {
+//                    log.info("[发送错误订阅]");
+//                    SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
+//                    SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(timeoutEvent);
+//                    if (subscribe != null){
+//                        subscribe.response(eventResult);
+//                    }
+//                    sipSubscribe.removeOkSubscribe(callIdHeader.getCallId());
+//                    sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId());
+//                }
+//            }
+//        }
+//        eventPublisher.requestTimeOut(timeoutEvent);
     }
 
     @Override
@@ -199,4 +199,6 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
     }
 
 
+
+
 }

+ 63 - 53
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java

@@ -1,7 +1,9 @@
 package com.genersoft.iot.vmp.gb28181.transmit;
 
+import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.gb28181.SipLayer;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
+import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import com.genersoft.iot.vmp.utils.GitUtil;
 import gov.nist.javax.sip.SipProviderImpl;
@@ -21,6 +23,7 @@ import java.text.ParseException;
 
 /**
  * 发送SIP消息
+ *
  * @author lin
  */
 @Slf4j
@@ -35,75 +38,80 @@ public class SIPSender {
 
     @Autowired
     private SipSubscribe sipSubscribe;
+    @Autowired
+    private SipConfig sipConfig;
 
     public void transmitRequest(String ip, Message message) throws SipException, ParseException {
-        transmitRequest(ip, message, null, null);
+        transmitRequest(ip, message, null, null, null);
     }
 
     public void transmitRequest(String ip, Message message, SipSubscribe.Event errorEvent) throws SipException, ParseException {
-        transmitRequest(ip, message, errorEvent, null);
+        transmitRequest(ip, message, errorEvent, null, null);
     }
 
     public void transmitRequest(String ip, Message message, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws SipException {
-            ViaHeader viaHeader = (ViaHeader)message.getHeader(ViaHeader.NAME);
-            String transport = "UDP";
-            if (viaHeader == null) {
-                log.warn("[消息头缺失]: ViaHeader, 使用默认的UDP方式处理数据");
-            }else {
-                transport = viaHeader.getTransport();
-            }
-            if (message.getHeader(UserAgentHeader.NAME) == null) {
-                try {
-                    message.addHeader(SipUtils.createUserAgentHeader(gitUtil));
-                } catch (ParseException e) {
-                    log.error("添加UserAgentHeader失败", e);
-                }
+        transmitRequest(ip, message, errorEvent, okEvent, null);
+    }
+
+    public void transmitRequest(String ip, Message message, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent, Long timeout) throws SipException {
+        ViaHeader viaHeader = (ViaHeader) message.getHeader(ViaHeader.NAME);
+        String transport = "UDP";
+        if (viaHeader == null) {
+            log.warn("[消息头缺失]: ViaHeader, 使用默认的UDP方式处理数据");
+        } else {
+            transport = viaHeader.getTransport();
+        }
+        if (message.getHeader(UserAgentHeader.NAME) == null) {
+            try {
+                message.addHeader(SipUtils.createUserAgentHeader(gitUtil));
+            } catch (ParseException e) {
+                log.error("添加UserAgentHeader失败", e);
             }
+        }
 
+        if (okEvent != null || errorEvent != null) {
             CallIdHeader callIdHeader = (CallIdHeader) message.getHeader(CallIdHeader.NAME);
-            // 添加错误订阅
-            if (errorEvent != null) {
-                sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (eventResult -> {
-                    sipSubscribe.removeErrorSubscribe(eventResult.callId);
-                    sipSubscribe.removeOkSubscribe(eventResult.callId);
-                    errorEvent.response(eventResult);
-                }));
-            }
-            // 添加订阅
-            if (okEvent != null) {
-                sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), eventResult -> {
-                    sipSubscribe.removeOkSubscribe(eventResult.callId);
-                    sipSubscribe.removeErrorSubscribe(eventResult.callId);
+            SipEvent sipEvent = SipEvent.getInstance(callIdHeader.getCallId(), eventResult -> {
+                sipSubscribe.removeSubscribe(eventResult.callId);
+                if(okEvent != null) {
                     okEvent.response(eventResult);
-                });
-            }
-            if ("TCP".equals(transport)) {
-                SipProviderImpl tcpSipProvider = sipLayer.getTcpSipProvider(ip);
-                if (tcpSipProvider == null) {
-                    log.error("[发送信息失败] 未找到tcp://{}的监听信息", ip);
-                    return;
                 }
-                if (message instanceof Request) {
-                    tcpSipProvider.sendRequest((Request)message);
-                }else if (message instanceof Response) {
-                    tcpSipProvider.sendResponse((Response)message);
+            }, (eventResult -> {
+                sipSubscribe.removeSubscribe(eventResult.callId);
+                if (errorEvent != null) {
+                    errorEvent.response(eventResult);
                 }
+            }), timeout == null ? sipConfig.getTimeout() : timeout);
+            sipSubscribe.addSubscribe(callIdHeader.getCallId(), sipEvent);
+        }
 
-            } else if ("UDP".equals(transport)) {
-                SipProviderImpl sipProvider = sipLayer.getUdpSipProvider(ip);
-                if (sipProvider == null) {
-                    log.error("[发送信息失败] 未找到udp://{}的监听信息", ip);
-                    return;
-                }
-                if (message instanceof Request) {
-                    sipProvider.sendRequest((Request)message);
-                }else if (message instanceof Response) {
-                    sipProvider.sendResponse((Response)message);
-                }
+        if ("TCP".equals(transport)) {
+            SipProviderImpl tcpSipProvider = sipLayer.getTcpSipProvider(ip);
+            if (tcpSipProvider == null) {
+                log.error("[发送信息失败] 未找到tcp://{}的监听信息", ip);
+                return;
+            }
+            if (message instanceof Request) {
+                tcpSipProvider.sendRequest((Request) message);
+            } else if (message instanceof Response) {
+                tcpSipProvider.sendResponse((Response) message);
+            }
+
+        } else if ("UDP".equals(transport)) {
+            SipProviderImpl sipProvider = sipLayer.getUdpSipProvider(ip);
+            if (sipProvider == null) {
+                log.error("[发送信息失败] 未找到udp://{}的监听信息", ip);
+                return;
+            }
+            if (message instanceof Request) {
+                sipProvider.sendRequest((Request) message);
+            } else if (message instanceof Response) {
+                sipProvider.sendResponse((Response) message);
             }
+        }
     }
 
-    public CallIdHeader getNewCallIdHeader(String ip, String transport){
+    public CallIdHeader getNewCallIdHeader(String ip, String transport) {
         if (ObjectUtils.isEmpty(transport)) {
             return sipLayer.getUdpSipProvider().getNewCallId();
         }
@@ -111,7 +119,7 @@ public class SIPSender {
         if (ObjectUtils.isEmpty(ip)) {
             sipProvider = transport.equalsIgnoreCase("TCP") ? sipLayer.getTcpSipProvider()
                     : sipLayer.getUdpSipProvider();
-        }else {
+        } else {
             sipProvider = transport.equalsIgnoreCase("TCP") ? sipLayer.getTcpSipProvider(ip)
                     : sipLayer.getUdpSipProvider(ip);
         }
@@ -122,9 +130,11 @@ public class SIPSender {
 
         if (sipProvider != null) {
             return sipProvider.getNewCallId();
-        }else {
+        } else {
             log.warn("[新建CallIdHeader失败], ip={}, transport={}", ip, transport);
             return null;
         }
     }
+
+
 }

+ 4 - 4
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java

@@ -100,7 +100,7 @@ public interface ISIPCommander {
 	 * @param device  视频设备
 	 * @param channel  预览通道
 	 */
-	void playStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
+	void playStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException;
 
 	/**
 	 * 请求回放视频流
@@ -110,7 +110,7 @@ public interface ISIPCommander {
 	 * @param startTime 开始时间,格式要求:yyyy-MM-dd HH:mm:ss
 	 * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss
 	 */
-	void playbackStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInf, Device device, DeviceChannel channel, String startTime, String endTime, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
+	void playbackStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInf, Device device, DeviceChannel channel, String startTime, String endTime, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException;
 
 	/**
 	 * 请求历史媒体下载
@@ -123,7 +123,7 @@ public interface ISIPCommander {
 	 */ 
 	void downloadStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel,
                            String startTime, String endTime, int downloadSpeed,
-                           SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException;
+                           SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException;
 
 
 	/**
@@ -131,7 +131,7 @@ public interface ISIPCommander {
 	 */
 	void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
 
-	void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, Device device, DeviceChannel channelId, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
+	void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, Device device, DeviceChannel channelId, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException;
 
 
 	void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException;

+ 10 - 10
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

@@ -262,7 +262,7 @@ public class SIPCommander implements ISIPCommander {
      */
     @Override
     public void playStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel,
-                              SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
+                              SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException {
         String stream = ssrcInfo.getStream();
 
         if (device == null) {
@@ -335,8 +335,6 @@ public class SIPCommander implements ISIPCommander {
         // f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
 //			content.append("f=v/2/5/25/1/4000a/1/8/1" + "\r\n"); // 未发现支持此特性的设备
 
-
-
         Request request = headerProvider.createInviteRequest(device, channel.getDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
         sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> {
             sessionManager.removeByStream(ssrcInfo.getStream());
@@ -350,7 +348,7 @@ public class SIPCommander implements ISIPCommander {
                     InviteSessionType.PLAY);
             sessionManager.put(ssrcTransaction);
             okEvent.response(e);
-        });
+        }, timeout);
     }
 
     /**
@@ -364,7 +362,7 @@ public class SIPCommander implements ISIPCommander {
     @Override
     public void playbackStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel,
                                   String startTime, String endTime,
-                                  SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
+                                  SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException {
 
 
         log.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort());
@@ -445,7 +443,7 @@ public class SIPCommander implements ISIPCommander {
                     channel.getId(), sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAYBACK);
             sessionManager.put(ssrcTransaction);
             okEvent.response(event);
-        });
+        }, timeout);
     }
 
     /**
@@ -454,7 +452,7 @@ public class SIPCommander implements ISIPCommander {
     @Override
     public void downloadStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel,
                                   String startTime, String endTime, int downloadSpeed,
-                                  SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException {
+                                  SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException {
 
         log.info("[发送-请求历史媒体下载-命令] 流ID: {},节点为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort());
         String sdpIp;
@@ -536,11 +534,13 @@ public class SIPCommander implements ISIPCommander {
             SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, InviteSessionType.DOWNLOAD);
             sessionManager.put(ssrcTransaction);
             okEvent.response(event);
-        });
+        }, timeout);
     }
 
     @Override
-    public void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, Device device, DeviceChannel channel, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
+    public void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, Device device, DeviceChannel channel,
+                              String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent,
+                              SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException {
 
         String stream = sendRtpItem.getStream();
 
@@ -601,7 +601,7 @@ public class SIPCommander implements ISIPCommander {
             SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), "talk", stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.TALK);
             sessionManager.put(ssrcTransaction);
             okEvent.response(e);
-        });
+        }, timeout);
     }
 
     /**

+ 20 - 22
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java

@@ -127,23 +127,19 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
                 // 将 callid 写入缓存, 等注册成功可以更新状态
                 String callIdFromHeader = callIdHeader.getCallId();
                 redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, PlatformRegisterInfo.getInstance(parentPlatform.getServerGBId(), isRegister));
-
-                sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (event)->{
-                    if (event != null) {
-                        log.info("向上级平台 [ {} ] 注册发生错误: {} ",
-                                parentPlatform.getServerGBId(),
-                                event.msg);
-                    }
-                    redisCatchStorage.delPlatformRegisterInfo(callIdFromHeader);
-                    if (errorEvent != null ) {
-                        errorEvent.response(event);
-                    }
-                });
             }else {
                 request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, fromTag, toTag, www, callIdHeader, isRegister? parentPlatform.getExpires() : 0);
             }
 
-            sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, null, okEvent);
+            sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, (event)->{
+                if (event != null) {
+                    log.info("[国标级联]:{},  注册失败: {} ", parentPlatform.getServerGBId(), event.msg);
+                }
+                redisCatchStorage.delPlatformRegisterInfo(callIdHeader.getCallId());
+                if (errorEvent != null ) {
+                    errorEvent.response(event);
+                }
+            }, okEvent, 5L);
     }
 
     @Override
@@ -167,6 +163,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
                 SipUtils.getNewFromTag(),
                 SipUtils.getNewViaTag(),
                 callIdHeader);
+
         sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, errorEvent, okEvent);
         return callIdHeader.getCallId();
     }
@@ -249,16 +246,17 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
         log.debug(catalogXml);
         if (sendAfterResponse) {
             // 默认按照收到200回复后发送下一条, 如果超时收不到回复,就以30毫秒的间隔直接发送。
-            dynamicTask.startDelay(timeoutTaskKey, ()->{
-                sipSubscribe.removeOkSubscribe(callId);
-                int indexNext = index + parentPlatform.getCatalogGroup();
-                try {
-                    sendCatalogResponse(channels, parentPlatform, sn, fromTag, indexNext, false);
-                } catch (SipException | InvalidArgumentException | ParseException e) {
-                    log.error("[命令发送失败] 国标级联 目录查询回复: {}", e.getMessage());
-                }
-            }, 3000);
             sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, eventResult -> {
+                if (eventResult.type.equals(SipSubscribe.EventResultType.timeout)) {
+                    // 消息发送超时, 以30毫秒的间隔直接发送
+                    int indexNext = index + parentPlatform.getCatalogGroup();
+                    try {
+                        sendCatalogResponse(channels, parentPlatform, sn, fromTag, indexNext, false);
+                    } catch (SipException | InvalidArgumentException | ParseException e) {
+                        log.error("[命令发送失败] 国标级联 目录查询回复: {}", e.getMessage());
+                    }
+                    return;
+                }
                 log.error("[目录推送失败] 国标级联 platform : {}, code: {}, msg: {}, 停止发送", parentPlatform.getServerGBId(), eventResult.statusCode, eventResult.msg);
                 dynamicTask.stop(timeoutTaskKey);
             }, eventResult -> {

+ 4 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java

@@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent;
 import com.genersoft.iot.vmp.gb28181.bean.Platform;
 import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
+import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent;
 import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
 import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@@ -93,11 +94,12 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement
                 // 不存在则回复404
                 responseAck(request, Response.NOT_FOUND, "device "+ deviceId +" not found");
                 log.warn("[设备未找到 ]deviceId: {}, callId: {}", deviceId, callIdHeader.getCallId());
-                if (sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()) != null){
+                SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId());
+                if (sipEvent != null && sipEvent.getErrorEvent() != null){
                     DeviceNotFoundEvent deviceNotFoundEvent = new DeviceNotFoundEvent(evt.getDialog());
                     deviceNotFoundEvent.setCallId(callIdHeader.getCallId());
                     SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(deviceNotFoundEvent);
-                    sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()).response(eventResult);
+                    sipEvent.getErrorEvent().response(eventResult);
                 }
             }else {
                 Element rootElement;

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/ISIPResponseProcessor.java

@@ -1,5 +1,7 @@
 package com.genersoft.iot.vmp.gb28181.transmit.event.response;
 
+import org.springframework.scheduling.annotation.Async;
+
 import javax.sip.ResponseEvent;
 
 /**    
@@ -9,6 +11,7 @@ import javax.sip.ResponseEvent;
  */
 public interface ISIPResponseProcessor {
 
+
 	void process(ResponseEvent evt);
 
 

+ 0 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java

@@ -38,14 +38,12 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
 	@Autowired
 	private SIPProcessorObserver sipProcessorObserver;
 
-
 	@Autowired
 	private SIPSender sipSender;
 
 	@Autowired
 	private SIPRequestHeaderProvider headerProvider;
 
-
 	@Override
 	public void afterPropertiesSet() throws Exception {
 		// 添加消息处理的订阅

+ 7 - 5
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/timeout/impl/TimeoutProcessorImpl.java

@@ -1,6 +1,7 @@
 package com.genersoft.iot.vmp.gb28181.transmit.event.timeout.impl;
 
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
+import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
 import com.genersoft.iot.vmp.gb28181.transmit.event.timeout.ITimeoutProcessor;
 import lombok.extern.slf4j.Slf4j;
@@ -32,11 +33,12 @@ public class TimeoutProcessorImpl implements InitializingBean, ITimeoutProcessor
             // TODO Auto-generated method stub
             CallIdHeader callIdHeader = event.getClientTransaction().getDialog().getCallId();
             String callId = callIdHeader.getCallId();
-            SipSubscribe.Event errorSubscribe = sipSubscribe.getErrorSubscribe(callId);
-            SipSubscribe.EventResult<TimeoutEvent> timeoutEventEventResult = new SipSubscribe.EventResult<>(event);
-            errorSubscribe.response(timeoutEventEventResult);
-            sipSubscribe.removeErrorSubscribe(callId);
-            sipSubscribe.removeOkSubscribe(callId);
+            SipEvent sipEvent = sipSubscribe.getSubscribe(callId);
+            if (sipEvent != null && sipEvent.getErrorEvent() != null) {
+                SipSubscribe.EventResult<TimeoutEvent> timeoutEventEventResult = new SipSubscribe.EventResult<>(event);
+                sipEvent.getErrorEvent().response(timeoutEventEventResult);
+                sipSubscribe.removeSubscribe(callId);
+            }
         } catch (Exception e) {
             log.error("[超时事件失败]: {}", e.getMessage());
         }

+ 2 - 0
src/main/resources/配置详情.yml

@@ -123,6 +123,8 @@ sip:
     # keepalliveToOnline: false
     # 是否存储alarm信息
     alarm: false
+    # 命令发送等待回复的超时时间, 单位:秒
+    timeout: 15
 
 # 做为JT1078服务器的配置
 jt1078: