Browse Source

添加第二种语音对讲实现

648540858 3 years ago
parent
commit
06bbe3fe01
29 changed files with 876 additions and 576 deletions
  1. 0 2
      pom.xml
  2. 3 3
      src/main/java/com/genersoft/iot/vmp/conf/exception/SsrcTransactionNotFoundException.java
  3. 8 16
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java
  4. 1 1
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamType.java
  5. 2 1
      src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java
  6. 10 22
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
  7. 33 0
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
  8. 80 62
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
  9. 1 1
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
  10. 8 6
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
  11. 21 19
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
  12. 70 68
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
  13. 63 128
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
  14. 7 15
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java
  15. 0 4
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
  16. 10 8
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java
  17. 3 2
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java
  18. 3 0
      src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java
  19. 148 5
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
  20. 0 27
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
  21. 2 0
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZlmHttpHookSubscribe.java
  22. 16 0
      src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java
  23. 42 0
      src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamPush.java
  24. 12 1
      src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
  25. 1 1
      src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
  26. 281 105
      src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
  27. 13 52
      src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
  28. 2 2
      web_src/config/index.js
  29. 36 25
      web_src/src/components/dialog/devicePlayer.vue

+ 0 - 2
pom.xml

@@ -273,7 +273,6 @@
 					<dateFormat>yyyyMMdd</dateFormat>
 				</configuration>
 			</plugin>
-
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-surefire-plugin</artifactId>
@@ -282,7 +281,6 @@
 					<skipTests>true</skipTests>
 				</configuration>
 			</plugin>
-
 		</plugins>
 		<resources>
 			<resource>

+ 3 - 3
src/main/java/com/genersoft/iot/vmp/conf/exception/SsrcTransactionNotFoundException.java

@@ -39,12 +39,12 @@ public class SsrcTransactionNotFoundException extends Exception{
     @Override
     public String getMessage() {
         StringBuffer msg = new StringBuffer();
-        msg.append(StringFormatter.format("缓存事务信息未找到,device:%s channel: %s ",  deviceId, channelId));
+        msg.append(String.format("缓存事务信息未找到,device:%s channel: %s ",  deviceId, channelId));
         if (callId != null) {
-            msg.append("callId: " + callId);
+            msg.append(",callId: " + callId);
         }
         if (stream != null) {
-            msg.append("stream: " + stream);
+            msg.append(",stream: " + stream);
         }
         return msg.toString();
     }

+ 8 - 16
src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java

@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.bean;
 
 
 import gov.nist.javax.sip.message.SIPRequest;
+import gov.nist.javax.sip.message.SIPResponse;
 import gov.nist.javax.sip.stack.SIPDialog;
 
 import javax.sip.Dialog;
@@ -40,12 +41,7 @@ public class AudioBroadcastCatch {
     /**
      * 请求信息
      */
-    private SIPRequest request;
-
-    /**
-     * 会话信息
-     */
-    private SIPDialog dialog;
+    private SipTransactionInfo sipTransactionInfo;
 
 
     public String getDeviceId() {
@@ -72,19 +68,15 @@ public class AudioBroadcastCatch {
         this.status = status;
     }
 
-    public void setDialog(SIPDialog dialog) {
-        this.dialog = dialog;
-    }
-
-    public SIPDialog getDialog() {
-        return dialog;
+    public SipTransactionInfo getSipTransactionInfo() {
+        return sipTransactionInfo;
     }
 
-    public SIPRequest getRequest() {
-        return request;
+    public void setSipTransactionInfo(SipTransactionInfo sipTransactionInfo) {
+        this.sipTransactionInfo = sipTransactionInfo;
     }
 
-    public void setRequest(SIPRequest request) {
-        this.request = request;
+    public void setSipTransactionInfoByRequset(SIPResponse response) {
+        this.sipTransactionInfo = new SipTransactionInfo(response);
     }
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamType.java

@@ -2,7 +2,7 @@ package com.genersoft.iot.vmp.gb28181.bean;
 
 public enum InviteStreamType {
 
-    PLAY,PLAYBACK,PUSH,PROXY,CLOUD_RECORD_PUSH,CLOUD_RECORD_PROXY
+    PLAY,PLAYBACK,PUSH,PROXY,CLOUD_RECORD_PUSH,CLOUD_RECORD_PROXY,TALK
 
 
 }

+ 2 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java

@@ -1,5 +1,6 @@
 package com.genersoft.iot.vmp.gb28181.event;
 
+import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
 import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent;
 import gov.nist.javax.sip.message.SIPRequest;
 import org.slf4j.Logger;
@@ -57,7 +58,7 @@ public class SipSubscribe {
         logger.debug("errorSubscribes.size:{}",errorSubscribes.size());
     }
 
-    public interface Event { void response(EventResult eventResult) ;
+    public interface Event { void response(EventResult eventResult);
     }
 
     /**

+ 10 - 22
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java

@@ -8,18 +8,12 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 import gov.nist.javax.sip.message.SIPRequest;
-import gov.nist.javax.sip.message.SIPRequest;
-import gov.nist.javax.sip.stack.SIPDialog;
 
-import javax.sip.Dialog;
-import javax.sip.InvalidArgumentException;
-import javax.sip.SipException;
-import java.text.ParseException;
 import javax.sip.InvalidArgumentException;
 import javax.sip.PeerUnavailableException;
 import javax.sip.SipException;
-import javax.sip.message.Request;
 import java.text.ParseException;
+import javax.sip.message.Request;
 
 /**    
  * @description:设备能力接口,用于定义设备的控制、查询能力   
@@ -130,14 +124,18 @@ public interface ISIPCommander {
 						   String startTime, String endTime, int downloadSpeed, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
 						   SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
 
+
 	/**
 	 * 视频流停止
 	 */
-	void streamByeCmd(String deviceId, String channelId, String stream, String callId, SipSubscribe.Event okEvent);
-	void streamByeCmd(String deviceId, String channelId, String stream, String callId);
 	void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
+
+	void talkStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String callId, ZlmHttpHookSubscribe.Event event, ZlmHttpHookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
+
 	void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException;
 
+	void streamByeCmd(Device device, String channelId, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
+
 	/**
 	 * 回放暂停
 	 */
@@ -168,22 +166,12 @@ public interface ISIPCommander {
 
 
     /**
-
-	/**
-	 * 语音广播
-	 * 
-	 * @param device  视频设备
-	 */
-	boolean audioBroadcastCmd(Device device, String channelId, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent);
-	void audioBroadcastCmd(Device device,String channelId);
-
-	/**
+	 * /**
 	 * 语音广播
 	 *
-	 * @param device  视频设备
+	 * @param device 视频设备
 	 */
-	void audioBroadcastCmd(Device device, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException;
-	void audioBroadcastCmd(Device device) throws InvalidArgumentException, SipException, ParseException;
+	void audioBroadcastCmd(Device device, String channelId, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
 
 	/**
 	 * 音视频录像控制

+ 33 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java

@@ -326,4 +326,37 @@ public class SIPRequestHeaderProvider {
 
 		return request;
 	}
+
+	public Request createBroadcastMessageRequest(Device device, String channelId, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
+		Request request = null;
+		// sipuri
+		SipURI requestURI = sipFactory.createAddressFactory().createSipURI(channelId, device.getHostAddress());
+		// via
+		ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
+		ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(sipConfig.getIp(), sipConfig.getPort(), device.getTransport(), viaTag);
+		viaHeader.setRPort();
+		viaHeaders.add(viaHeader);
+		// from
+		SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getDomain());
+		Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI);
+		FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, fromTag);
+		// to
+		SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(channelId, device.getHostAddress());
+		Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI);
+		ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, toTag);
+
+		// Forwards
+		MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
+		// ceq
+		CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.MESSAGE);
+
+		ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
+
+		request = sipFactory.createMessageFactory().createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader,
+				toHeader, viaHeaders, maxForwards, contentTypeHeader, content);
+
+		request.addHeader(SipUtils.createUserAgentHeader(sipFactory, gitUtil));
+
+		return request;
+	}
 }

+ 80 - 62
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

@@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
 
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.StreamInfo;
-import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
@@ -12,45 +11,32 @@ import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
+import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamPush;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.GitUtil;
-import gov.nist.javax.sip.SIPConstants;
 import gov.nist.javax.sip.SipProviderImpl;
-import gov.nist.javax.sip.SipStackImpl;
 import gov.nist.javax.sip.message.SIPRequest;
 import gov.nist.javax.sip.message.SIPResponse;
-import gov.nist.javax.sip.stack.SIPClientTransaction;
-import gov.nist.javax.sip.stack.SIPClientTransactionImpl;
-import gov.nist.javax.sip.stack.SIPDialog;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.DependsOn;
-import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
 import org.springframework.util.ObjectUtils;
 
 import javax.sip.*;
-import javax.sip.address.Address;
-import javax.sip.address.SipURI;
 import javax.sip.header.*;
 import javax.sip.message.Request;
-import javax.sip.message.Response;
-import java.lang.reflect.Field;
 import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
 
 /**
  * @description:设备能力接口,用于定义设备的控制、查询能力
@@ -98,6 +84,9 @@ public class SIPCommander implements ISIPCommander {
     @Autowired
     private IMediaServerService mediaServerService;
 
+    @Autowired
+    private ZLMRTPServerFactory zlmrtpServerFactory;
+
 
     /**
      * 云台方向放控制,使用配置文件中的默认镜头移动速度
@@ -591,11 +580,73 @@ public class SIPCommander implements ISIPCommander {
         });
     }
 
+    @Override
+    public void talkStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String callId, ZlmHttpHookSubscribe.Event event, ZlmHttpHookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
+
+        String stream = ssrcInfo.getStream();
+
+        if (device == null) {
+            return;
+        }
+        if (!mediaServerItem.isRtpEnable()) {
+            // 单端口暂不支持语音对讲
+            logger.info("[语音对讲] 单端口暂不支持此操作");
+            return;
+        }
+
+        logger.info("[语音对讲] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
+        HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
+        subscribe.addSubscribe(hookSubscribeForStreamChange, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
+            if (event != null) {
+                event.response(mediaServerItemInUse, json);
+                subscribe.removeSubscribe(hookSubscribeForStreamChange);
+            }
+        });
+
+        CallIdHeader callIdHeader = device.getTransport().equalsIgnoreCase("TCP") ? tcpSipProvider.getNewCallId()
+                : udpSipProvider.getNewCallId();
+        callIdHeader.setCallId(callId);
+        HookSubscribeForStreamPush hookSubscribeForStreamPush = HookSubscribeFactory.on_publish("rtp", stream,  null, mediaServerItem.getId());
+        subscribe.addSubscribe(hookSubscribeForStreamPush, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
+            if (eventForPush != null) {
+                eventForPush.response(mediaServerItemInUse, json);
+            }
+        });
+        //
+        StringBuffer content = new StringBuffer(200);
+        content.append("v=0\r\n");
+        content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
+        content.append("s=Talk\r\n");
+        content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
+        content.append("t=0 0\r\n");
+
+        content.append("m=audio " + ssrcInfo.getPort() + " RTP/AVP 8\r\n");
+        content.append("a=sendrecv\r\n");
+        content.append("a=rtpmap:8 PCMA/8000\r\n");
+
+        content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc
+        // f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
+        content.append("f=v/////a/1/8/1" + "\r\n");
+
+        Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(), callIdHeader);
+        transmitRequest(device.getTransport(), request, (e -> {
+            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
+            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
+            errorEvent.response(e);
+        }), e -> {
+            // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
+            ResponseEvent responseEvent = (ResponseEvent) e.event;
+            SIPResponse response = (SIPResponse) responseEvent.getResponse();
+            streamSession.put(device.getDeviceId(), channelId, "talk", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.play);
+            okEvent.response(e);
+        });
+    }
+
     /**
      * 视频流停止, 不使用回调
      */
     @Override
-    public void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException {
+    public synchronized void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException {
         streamByeCmd(device, channelId, stream, callId, null);
     }
 
@@ -603,7 +654,7 @@ public class SIPCommander implements ISIPCommander {
      * 视频流停止
      */
     @Override
-    public void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
+    public synchronized void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
         SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callId, stream);
         if (ssrcTransaction == null) {
             throw new SsrcTransactionNotFoundException(device.getDeviceId(), channelId, callId, stream);
@@ -617,67 +668,34 @@ public class SIPCommander implements ISIPCommander {
         transmitRequest(device.getTransport(), byteRequest, null, okEvent);
     }
 
-    /**
+    @Override
+    public synchronized void streamByeCmd(Device device, String channelId, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
+        Request byteRequest = headerProvider.createByteRequest(device, channelId, sipTransactionInfo);
+        transmitRequest(device.getTransport(), byteRequest, null, okEvent);
+    }
+
+	/**
      * 语音广播
      *
      * @param device 视频设备
      */
-    @Override
-    public void audioBroadcastCmd(Device device) throws InvalidArgumentException, SipException, ParseException {
-
-        StringBuffer broadcastXml = new StringBuffer(200);
-        String charset = device.getCharset();
-        broadcastXml.append("<?xml version=\"1.0\" encoding=\"" + charset + "\"?>\r\n");
-        broadcastXml.append("<Notify>\r\n");
-        broadcastXml.append("<CmdType>Broadcast</CmdType>\r\n");
-        broadcastXml.append("<SN>" + (int) ((Math.random() * 9 + 1) * 100000) + "</SN>\r\n");
-        broadcastXml.append("<SourceID>" + sipConfig.getId() + "</SourceID>\r\n");
-        broadcastXml.append("<TargetID>" + device.getDeviceId() + "</TargetID>\r\n");
-        broadcastXml.append("</Notify>\r\n");
-	/**
-	 * 语音广播
-	 *
-	 * @param device  视频设备
-	 */
 	@Override
-	public boolean audioBroadcastCmd(Device device,String channelId, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) {
-		try {
-			StringBuffer broadcastXml = new StringBuffer(200);
-			String charset = device.getCharset();
-			broadcastXml.append("<?xml version=\"1.0\" encoding=\"" + charset + "\"?>\r\n");
-			broadcastXml.append("<Notify>\r\n");
-			broadcastXml.append("<CmdType>Broadcast</CmdType>\r\n");
-			broadcastXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
-			broadcastXml.append("<SourceID>" + sipConfig.getId() + "</SourceID>\r\n");
-			broadcastXml.append("<TargetID>" + channelId + "</TargetID>\r\n");
-			broadcastXml.append("</Notify>\r\n");
-
-        CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
-                : udpSipProvider.getNewCallId();
-
-        Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, callIdHeader);
-        transmitRequest(device.getTransport(), request);
-
-    }
-
-    @Override
-    public void audioBroadcastCmd(Device device, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
-
+	public void audioBroadcastCmd(Device device, String channelId, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
         StringBuffer broadcastXml = new StringBuffer(200);
         String charset = device.getCharset();
         broadcastXml.append("<?xml version=\"1.0\" encoding=\"" + charset + "\"?>\r\n");
         broadcastXml.append("<Notify>\r\n");
         broadcastXml.append("<CmdType>Broadcast</CmdType>\r\n");
-        broadcastXml.append("<SN>" + (int) ((Math.random() * 9 + 1) * 100000) + "</SN>\r\n");
+        broadcastXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
         broadcastXml.append("<SourceID>" + sipConfig.getId() + "</SourceID>\r\n");
-        broadcastXml.append("<TargetID>" + device.getDeviceId() + "</TargetID>\r\n");
+        broadcastXml.append("<TargetID>" + channelId + "</TargetID>\r\n");
         broadcastXml.append("</Notify>\r\n");
 
         CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                 : udpSipProvider.getNewCallId();
 
         Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, callIdHeader);
-        transmitRequest(device.getTransport(), request, errorEvent);
+        transmitRequest(device.getTransport(), request, errorEvent, okEvent);
 
     }
 

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java

@@ -676,7 +676,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
     }
 
     @Override
-    public void streamByeCmd(ParentPlatform platform, SendRtpItem sendRtpItem) throws SipException, InvalidArgumentException, ParseException {
+    public synchronized void streamByeCmd(ParentPlatform platform, SendRtpItem sendRtpItem) throws SipException, InvalidArgumentException, ParseException {
         if (sendRtpItem == null ) {
             logger.info("[向上级发送BYE], sendRtpItem 为NULL");
             return;

+ 8 - 6
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java

@@ -1,12 +1,12 @@
 package com.genersoft.iot.vmp.gb28181.transmit.event.request;
 
+import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import gov.nist.javax.sip.SipProviderImpl;
 import gov.nist.javax.sip.SipStackImpl;
 import gov.nist.javax.sip.message.SIPRequest;
 import gov.nist.javax.sip.message.SIPResponse;
-import gov.nist.javax.sip.stack.SIPServerTransaction;
 import gov.nist.javax.sip.stack.SIPServerTransactionImpl;
 import org.apache.commons.lang3.ArrayUtils;
 import org.dom4j.Document;
@@ -27,8 +27,6 @@ import javax.sip.message.MessageFactory;
 import javax.sip.message.Request;
 import javax.sip.message.Response;
 import java.io.ByteArrayInputStream;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -51,6 +49,9 @@ public abstract class SIPRequestProcessorParent {
 	@Qualifier(value="udpSipProvider")
 	private SipProviderImpl udpSipProvider;
 
+	@Autowired
+	private SipConfig sipConfig;
+
 	/**
 	 * 根据 RequestEvent 获取 ServerTransaction
 	 * @param evt
@@ -60,13 +61,15 @@ public abstract class SIPRequestProcessorParent {
 		Request request = evt.getRequest();
 		SIPServerTransactionImpl serverTransaction = (SIPServerTransactionImpl)evt.getServerTransaction();
 		// 判断TCP还是UDP
+		boolean isTcp = false;
 		ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME);
 		String transport = reqViaHeader.getTransport();
+		if (transport.equalsIgnoreCase("TCP")) {
+			isTcp = true;
+		}
 		if (serverTransaction != null && serverTransaction.getOriginalRequest() == null) {
 			serverTransaction.setOriginalRequest((SIPRequest) evt.getRequest());
 		}
-		boolean isTcp = "TCP".equals(transport);
-
 		if (serverTransaction == null) {
 			try {
 				if (isTcp) {
@@ -187,7 +190,6 @@ public abstract class SIPRequestProcessorParent {
 	 * 回复带sdp的200
 	 */
 	public SIPResponse responseSdpAck(ServerTransaction serverTransaction, String sdp, ParentPlatform platform) throws SipException, InvalidArgumentException, ParseException {
-
 		ContentTypeHeader contentTypeHeader = SipFactory.getInstance().createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
 
 		// 兼容国标中的使用编码@域名作为RequestURI的情况

+ 21 - 19
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java

@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.conf.DynamicTask;
+import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
@@ -14,6 +15,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.service.IDeviceService;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
 import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
@@ -21,7 +23,6 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import gov.nist.javax.sip.message.SIPRequest;
 import gov.nist.javax.sip.stack.SIPDialog;
-import com.genersoft.iot.vmp.utils.SerializeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
@@ -81,6 +82,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 	@Autowired
 	private ISIPCommander cmder;
 
+	@Autowired
+	private IDeviceService deviceService;
+
 	@Autowired
 	private ISIPCommanderForPlatform commanderForPlatform;
 
@@ -106,7 +110,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 		// 取消设置的超时任务
 		dynamicTask.stop(callIdHeader.getCallId());
 		String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
-		SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId());
+		SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
 		if (sendRtpItem == null) {
 			logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId);
 			return;
@@ -123,7 +127,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 		param.put("pt", sendRtpItem.getPt());
 		param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
 		param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
-		if (!sendRtpItem.isTcp() && parentPlatform.isRtcp()) {
+		if (!sendRtpItem.isTcp() && parentPlatform != null && parentPlatform.isRtcp()) {
 			// 开启rtcp保活
 			param.put("udp_rtcp_timeout", "1");
 		}
@@ -141,29 +145,28 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 			if (jsonObject == null) {
 				logger.error("RTP推流失败: 请检查ZLM服务");
 			} else if (jsonObject.getInteger("code") == 0) {
-
-				if (sendRtpItem.isOnlyAudio()) {
-					AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-					audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok);
-					audioBroadcastCatch.setDialog((SIPDialog) evt.getDialog());
-					audioBroadcastCatch.setRequest((SIPRequest) evt.getRequest());
-					audioBroadcastManager.update(audioBroadcastCatch);
-					String waiteStreamTimeoutTaskKey = "waite-stream-" + audioBroadcastCatch.getDeviceId() + audioBroadcastCatch.getChannelId();
-					dynamicTask.stop(waiteStreamTimeoutTaskKey);
-				}
 				logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
 			} else {
 				logger.error("RTP推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSONObject.toJSON(param));
 				if (sendRtpItem.isOnlyAudio()) {
 					// 语音对讲
-					try {
-						cmder.streamByeCmd((SIPDialog) evt.getDialog(), sendRtpItem.getChannelId(), (SIPRequest) evt.getRequest(), null);
-					} catch (SipException | ParseException | InvalidArgumentException e) {
-						logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage());
+					Device device = deviceService.queryDevice(platformGbId);
+					if (device != null) {
+						try {
+							cmder.streamByeCmd(device, sendRtpItem.getChannelId(), sendRtpItem.getStreamId(), null);
+						} catch (SipException | ParseException | InvalidArgumentException |
+								 SsrcTransactionNotFoundException e) {
+							logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage());
+						}
 					}
+
 				} else {
 					// 向上级平台
-					commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
+					try {
+						commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
+					} catch (SipException | InvalidArgumentException | ParseException e) {
+						logger.error("[命令发送失败] 国标级联, 回复BYE: {}", e.getMessage());
+					}
 				}
 				if (mediaInfo == null) {
 					RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
@@ -179,7 +182,6 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 				}
 
 
-			}
 		}
 	}
 	private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform,

+ 70 - 68
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java

@@ -2,10 +2,8 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
-import com.genersoft.iot.vmp.gb28181.bean.Device;
-import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
+import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
@@ -54,6 +52,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 	@Autowired
 	private IDeviceService deviceService;
 
+	@Autowired
+	private AudioBroadcastManager audioBroadcastManager;
+
 	@Autowired
 	private IVideoManagerStorage storager;
 
@@ -91,78 +92,79 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 			logger.error("[回复BYE信息失败],{}", e.getMessage());
 		}
 		CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
-			String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
-			String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
-			SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId());
-			logger.info("[收到bye] {}/{}", platformGbId, channelId);
-			if (sendRtpItem != null){
-				String streamId = sendRtpItem.getStreamId();
-				Map<String, Object> param = new HashMap<>();
-				param.put("vhost","__defaultVhost__");
-				param.put("app",sendRtpItem.getApp());
-				param.put("stream",streamId);
-				param.put("ssrc",sendRtpItem.getSsrc());
-				logger.info("[收到bye] 停止向上级推流:{}", streamId);
-				MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-				redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null);
-				zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
-				int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
-				if (totalReaderCount <= 0) {
-					logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
-					if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
-						Device device = deviceService.queryDevice(sendRtpItem.getDeviceId());
-						if (device == null) {
-							logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
-						}
-						try {
-							logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), channelId);
-							cmder.streamByeCmd(device, channelId, streamId, null);
-						} catch (InvalidArgumentException | ParseException | SipException |
-								 SsrcTransactionNotFoundException e) {
-							logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
-						}
+		SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
+
+		if (sendRtpItem != null){
+			logger.info("[收到bye] {}/{}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId());
+			String streamId = sendRtpItem.getStreamId();
+			Map<String, Object> param = new HashMap<>();
+			param.put("vhost","__defaultVhost__");
+			param.put("app",sendRtpItem.getApp());
+			param.put("stream",streamId);
+			param.put("ssrc",sendRtpItem.getSsrc());
+			logger.info("[收到bye] 停止向上级推流:{}", streamId);
+			MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+			redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), callIdHeader.getCallId(), null);
+			zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
+			int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
+			if (totalReaderCount <= 0) {
+				logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
+				if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
+					Device device = deviceService.queryDevice(sendRtpItem.getDeviceId());
+					if (device == null) {
+						logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
 					}
-					if (sendRtpItem.isOnlyAudio()) {
-						playService.stopAudioBroadcast(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+					try {
+						logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+						cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null);
+					} catch (InvalidArgumentException | ParseException | SipException |
+							 SsrcTransactionNotFoundException e) {
+						logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
 					}
-					if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
-						MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
-								sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
-								sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId());
-						redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
-					}
-				}
-			}
-			// 可能是设备主动停止
-			Device device = storager.queryVideoDeviceByChannelId(platformGbId);
-			if (device != null) {
-				storager.stopPlay(device.getDeviceId(), channelId);
-				StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
-				if (streamInfo != null) {
-					redisCatchStorage.stopPlay(streamInfo);
-					mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream());
 				}
-				SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
-				if (ssrcTransactionForPlay != null){
-					if (ssrcTransactionForPlay.getCallId().equals(callIdHeader.getCallId())){
-						// 释放ssrc
-						MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlay.getMediaServerId());
-						if (mediaServerItem != null) {
-							mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlay.getSsrc());
-						}
-						streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream());
-					}
+
+				if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
+					MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+							sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
+							sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId());
+					redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
 				}
-				SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null);
-				if (ssrcTransactionForPlayBack != null) {
+			}
+			playService.stopAudioBroadcast(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+		}
+
+		String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
+		String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
+
+		// 可能是设备主动停止
+		Device device = storager.queryVideoDeviceByChannelId(platformGbId);
+		if (device != null) {
+			storager.stopPlay(device.getDeviceId(), channelId);
+			StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
+			if (streamInfo != null) {
+				redisCatchStorage.stopPlay(streamInfo);
+				mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream());
+			}
+			SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
+			if (ssrcTransactionForPlay != null){
+				if (ssrcTransactionForPlay.getCallId().equals(callIdHeader.getCallId())){
 					// 释放ssrc
-					MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlayBack.getMediaServerId());
+					MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlay.getMediaServerId());
 					if (mediaServerItem != null) {
-						mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlayBack.getSsrc());
+						mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlay.getSsrc());
 					}
-					streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream());
+					streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream());
 				}
 			}
-
+			SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null);
+			if (ssrcTransactionForPlayBack != null) {
+				// 释放ssrc
+				MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlayBack.getMediaServerId());
+				if (mediaServerItem != null) {
+					mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlayBack.getSsrc());
+				}
+				streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream());
+			}
+		}
 	}
 }

+ 63 - 128
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -1,11 +1,10 @@
 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
 import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
@@ -14,7 +13,8 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
@@ -30,20 +30,16 @@ import com.genersoft.iot.vmp.service.IStreamProxyService;
 import com.genersoft.iot.vmp.service.IStreamPushService;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
-import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
 import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
 import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
-import com.genersoft.iot.vmp.utils.SerializeUtils;
 import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 import gov.nist.javax.sdp.TimeDescriptionImpl;
 import gov.nist.javax.sdp.fields.TimeField;
 import gov.nist.javax.sip.message.SIPRequest;
-import gov.nist.javax.sip.stack.SIPDialog;
-import gov.nist.javax.sip.message.SIPRequest;
 import gov.nist.javax.sip.message.SIPResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,7 +68,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
     private final String method = "INVITE";
 
     @Autowired
-    private SIPCommanderFroPlatform cmderFroPlatform;
+    private ISIPCommanderForPlatform cmderFroPlatform;
 
     @Autowired
     private IVideoManagerStorage storager;
@@ -174,7 +170,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
             // 查询请求是否来自上级平台\设备
             ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId);
             if (platform == null) {
-                inviteFromDeviceHandle(serverTransaction, requesterId);
+                inviteFromDeviceHandle(serverTransaction, requesterId, channelId);
             } else {
                 // 查询平台下是否有该通道
                 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
@@ -393,14 +389,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                     };
                     SipSubscribe.Event errorEvent = ((event) -> {
                         // 未知错误。直接转发设备点播的错误
-                        Response response = null;
                         try {
-                            response = getMessageFactory().createResponse(event.statusCode, evt.getRequest());
+                            Response response = getMessageFactory().createResponse(event.statusCode, evt.getRequest());
                             serverTransaction.sendResponse(response);
                             System.out.println("未知错误。直接转发设备点播的错误");
                             if (serverTransaction.getDialog() != null) {
                                 serverTransaction.getDialog().delete();
                             }
+                            serverTransaction.getDialog().delete();
+
                         } catch (ParseException | SipException | InvalidArgumentException e) {
                             e.printStackTrace();
                         }
@@ -817,7 +814,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
     }
 
     public void inviteFromDeviceHandle(ServerTransaction serverTransaction, String requesterId, String channelId) throws InvalidArgumentException, ParseException, SipException, SdpException {
-
         // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
         Device device = redisCatchStorage.getDevice(requesterId);
         AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(requesterId, channelId);
@@ -918,125 +914,64 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
             sendRtpItem.setOnlyAudio(true);
             redisCatchStorage.updateSendRTPSever(sendRtpItem);
 
-            // hook监听等待设备推流上来
-            // 添加订阅
-            HookSubscribeForStreamChange subscribeKey = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId());
-
-            String finalSsrc = ssrc;
-            // 流已经存在时直接推流
-                // 设置等待推流的超时; 默认20s
-                String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + audioBroadcastCatch.getChannelId();
-                dynamicTask.startDelay(waiteStreamTimeoutTaskKey, ()->{
-                    logger.info("等待推流超时: {}/{}", app, stream);
-                    subscribe.removeSubscribe(subscribeKey);
-                    playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId());
-                    // 发送bye
-                    try {
-                        responseAck(evt, Response.BUSY_HERE);
-                    } catch (SipException e) {
-                        throw new RuntimeException(e);
-                    } catch (InvalidArgumentException e) {
-                        throw new RuntimeException(e);
-                    } catch (ParseException e) {
-                        throw new RuntimeException(e);
-                    }
-                }, 20*1000);
-
-                boolean finalMediaTransmissionTCP = mediaTransmissionTCP;
-                subscribe.addSubscribe(subscribeKey,
-                        (MediaServerItem mediaServerItemInUse, JSONObject json)->{
-                            logger.info("收到语音对讲推流");
-                            dynamicTask.stop(waiteStreamTimeoutTaskKey);
-                            MediaItem mediaItem = JSON.toJavaObject(json, MediaItem.class);
-                            Integer audioCodecId = null;
-                            if (mediaItem.getTracks() != null && mediaItem.getTracks().size() > 0) {
-                                for (int i = 0; i < mediaItem.getTracks().size(); i++) {
-                                    MediaItem.MediaTrack mediaTrack = mediaItem.getTracks().get(i);
-                                    if (mediaTrack.getCodecType() == 1) {
-                                        audioCodecId = mediaTrack.getCodecId();
-                                        break;
-                                    }
-                                }
-                            }
-
-                            try {
-                                sendRtpItem.setStatus(2);
-                                redisCatchStorage.updateSendRTPSever(sendRtpItem);
-                                StringBuffer content = new StringBuffer(200);
-                                content.append("v=0\r\n");
-                                content.append("o="+ config.getId() +" "+ sdp.getOrigin().getSessionId() +" " + sdp.getOrigin().getSessionVersion()  + " IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
-                                content.append("s=Play\r\n");
-                                content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
-                                content.append("t=0 0\r\n");
-                                if (audioCodecId == null) {
-                                    if (finalMediaTransmissionTCP) {
-                                        content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n");
-                                    }else {
-                                        content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");
-                                    }
-
-                                    content.append("a=rtpmap:8 PCMA/8000\r\n");
-                                }else {
-                                    if (audioCodecId == 4) {
-                                        if (finalMediaTransmissionTCP) {
-                                            content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 0\r\n");
-                                        }else {
-                                            content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 0\r\n");
-                                        }
-                                        content.append("a=rtpmap:0 PCMU/8000\r\n");
-                                    }else {
-                                        if (finalMediaTransmissionTCP) {
-                                            content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n");
-                                        }else {
-                                            content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");
-                                        }
-                                        content.append("a=rtpmap:8 PCMA/8000\r\n");
-                                    }
-                                }
-                                content.append("a=sendonly\r\n");
-                                if (sendRtpItem.isTcp()) {
-                                    content.append("a=connection:new\r\n");
-                                    if (!sendRtpItem.isTcpActive()) {
-                                        content.append("a=setup:active\r\n");
-                                    }else {
-                                        content.append("a=setup:passive\r\n");
-                                    }
-                                }
-                                content.append("y="+ finalSsrc + "\r\n");
-                                content.append("f=v/////a/1/8/1\r\n");
-
-                                ParentPlatform parentPlatform = new ParentPlatform();
-                                parentPlatform.setServerIP(device.getIp());
-                                parentPlatform.setServerPort(device.getPort());
-                                parentPlatform.setServerGBId(device.getDeviceId());
-
-                                responseSdpAck(serverTransaction, content.toString(), parentPlatform);
-                                Dialog dialog = evt.getDialog();
-                                audioBroadcastCatch.setDialog((SIPDialog) dialog);
-                                audioBroadcastCatch.setRequest((SIPRequest) request);
-                                audioBroadcastManager.update(audioBroadcastCatch);
-                            } catch (SipException | InvalidArgumentException | ParseException | SdpParseException e) {
-                                logger.error("[命令发送失败] 语音对讲: {}", e.getMessage());
-                            }
-                        });
-//            }
-            String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId();
-            WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>();
-            wvpResult.setCode(0);
-            wvpResult.setMsg("success");
-            AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult();
-            audioBroadcastResult.setApp(app);
-            audioBroadcastResult.setStream(stream);
-            audioBroadcastResult.setStreamInfo(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null,false));
-            audioBroadcastResult.setCodec("G.711");
-            wvpResult.setData(audioBroadcastResult);
-            RequestMessage requestMessage = new RequestMessage();
-            requestMessage.setKey(key);
-            requestMessage.setData(wvpResult);
-            resultHolder.invokeAllResult(requestMessage);
+            Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream);
+            if (streamReady) {
+                sendOk(device,  sendRtpItem, sdp, serverTransaction, mediaServerItem, mediaTransmissionTCP, ssrc);
+            }else {
+                logger.warn("[语音通话], 未发现待推送的流,app={},stream={}", app, stream);
+                playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId());
+            }
         } else {
             logger.warn("来自无效设备/平台的请求");
             responseAck(serverTransaction, Response.BAD_REQUEST);
         }
     }
+
+    void sendOk(Device device, SendRtpItem sendRtpItem, SessionDescription sdp, ServerTransaction serverTransaction,  MediaServerItem mediaServerItem, boolean mediaTransmissionTCP, String ssrc){
+        try {
+            sendRtpItem.setStatus(2);
+            redisCatchStorage.updateSendRTPSever(sendRtpItem);
+            StringBuffer content = new StringBuffer(200);
+            content.append("v=0\r\n");
+            content.append("o="+ config.getId() +" "+ sdp.getOrigin().getSessionId() +" " + sdp.getOrigin().getSessionVersion()  + " IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
+            content.append("s=Play\r\n");
+            content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
+            content.append("t=0 0\r\n");
+
+            if (mediaTransmissionTCP) {
+                content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n");
+            }else {
+                content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");
+            }
+
+            content.append("a=rtpmap:8 PCMA/8000/1\r\n");
+
+            content.append("a=sendonly\r\n");
+            if (sendRtpItem.isTcp()) {
+                content.append("a=connection:new\r\n");
+                if (!sendRtpItem.isTcpActive()) {
+                    content.append("a=setup:active\r\n");
+                }else {
+                    content.append("a=setup:passive\r\n");
+                }
+            }
+            content.append("y="+ ssrc + "\r\n");
+            content.append("f=v/////a/1/8/1\r\n");
+
+            ParentPlatform parentPlatform = new ParentPlatform();
+            parentPlatform.setServerIP(device.getIp());
+            parentPlatform.setServerPort(device.getPort());
+            parentPlatform.setServerGBId(device.getDeviceId());
+
+            SIPResponse sipResponse = responseSdpAck(serverTransaction, content.toString(), parentPlatform);
+
+            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), sendRtpItem.getChannelId());
+
+            audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok);
+            audioBroadcastCatch.setSipTransactionInfoByRequset(sipResponse);
+            audioBroadcastManager.update(audioBroadcastCatch);
+        } catch (SipException | InvalidArgumentException | ParseException | SdpParseException e) {
+            logger.error("[命令发送失败] 语音对讲 回复200OK(SDP): {}", e.getMessage());
+        }
+    }
 }

+ 7 - 15
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java

@@ -20,12 +20,8 @@ import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.ObjectUtils;
-import org.springframework.util.StringUtils;
 
-import javax.sip.InvalidArgumentException;
-import javax.sip.RequestEvent;
-import javax.sip.ServerTransaction;
-import javax.sip.SipException;
+import javax.sip.*;
 import javax.sip.header.*;
 import javax.sip.message.Request;
 import javax.sip.message.Response;
@@ -116,10 +112,12 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
 
             if (expiresHeader == null) {
                 response = getMessageFactory().createResponse(Response.BAD_REQUEST, request);
-                ServerTransaction serverTransaction = getServerTransaction(evt);
-                serverTransaction.sendResponse(response);
-                if (serverTransaction.getDialog() != null) {
-                    serverTransaction.getDialog().delete();
+                if (evt.getDialog() != null ) {
+                    if (evt.getDialog().isServer()) {
+                        ServerTransaction serverTransaction = getServerTransaction(evt);
+                        serverTransaction.sendResponse(response);
+                        serverTransaction.getDialog().delete();
+                    }
                 }
                 return;
             }
@@ -176,19 +174,13 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
         } catch (SipException | InvalidArgumentException | NoSuchAlgorithmException | ParseException e) {
             e.printStackTrace();
         }
-
     }
 
     private void sendResponse(RequestEvent evt, Response response) throws InvalidArgumentException, SipException {
         ServerTransaction serverTransaction = getServerTransaction(evt);
-        if (serverTransaction == null) {
-            logger.warn("[回复失败]:{}", response);
-            return;
-        }
         serverTransaction.sendResponse(response);
         if (serverTransaction.getDialog() != null) {
             serverTransaction.getDialog().delete();
         }
     }
-
 }

+ 0 - 4
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java

@@ -1,20 +1,16 @@
 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 
-import com.genersoft.iot.vmp.common.VideoManagerConstants;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.CmdType;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
 import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
-import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import gov.nist.javax.sip.SipProviderImpl;
 import gov.nist.javax.sip.message.SIPRequest;

+ 10 - 8
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java

@@ -3,11 +3,15 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
+import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.AckRequestProcessor;
 import org.dom4j.Element;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import javax.sip.InvalidArgumentException;
 import javax.sip.RequestEvent;
+import javax.sip.ServerTransaction;
 import javax.sip.SipException;
 import javax.sip.message.Response;
 import java.text.ParseException;
@@ -18,6 +22,8 @@ import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
 
 public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent implements IMessageHandler{
 
+    private Logger logger = LoggerFactory.getLogger(MessageHandlerAbstract.class);
+
     public Map<String, IMessageHandler> messageHandlerMap = new ConcurrentHashMap<>();
 
     public void addHandler(String cmdType, IMessageHandler messageHandler) {
@@ -48,14 +54,10 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i
 
     public void handNullCmd(RequestEvent evt){
         try {
-            responseAck(evt, Response.OK);
-        } catch (SipException e) {
-            throw new RuntimeException(e);
-        } catch (InvalidArgumentException e) {
-            throw new RuntimeException(e);
-        } catch (ParseException e) {
-            throw new RuntimeException(e);
+            ServerTransaction serverTransaction = getServerTransaction(evt);
+            responseAck(serverTransaction, Response.OK);
+        } catch (SipException | InvalidArgumentException | ParseException e) {
+            logger.error("[命令发送失败] 回复200 OK: {}", e.getMessage());
         }
-        return;
     }
 }

+ 3 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java

@@ -52,16 +52,17 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i
     public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
         try {
             String channelId = getText(rootElement, "DeviceID");
+            ServerTransaction serverTransaction = getServerTransaction(evt);
             if (!audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
                 // 回复410
-                responseAck(evt, Response.GONE);
+                responseAck(serverTransaction, Response.GONE);
                 return;
             }
             logger.info("收到语音广播的回复:{}/{}", device.getDeviceId(), channelId );
             AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId);
             audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite);
             audioBroadcastManager.update(audioBroadcastCatch);
-            responseAck(evt, Response.OK);
+            responseAck(serverTransaction, Response.OK);
         } catch (ParseException | SipException | InvalidArgumentException e) {
             logger.error("[命令发送失败] 国标级联 语音喊话: {}", e.getMessage());
         }

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java

@@ -113,4 +113,7 @@ public class SipUtils {
         return builder.toString();
     }
 
+    public static String getNewCallId() {
+        return (int) Math.floor(Math.random() * 10000) + "";
+    }
 }

+ 148 - 5
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -11,12 +11,16 @@ import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
+import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
+import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.media.zlm.dto.*;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -51,7 +55,13 @@ public class ZLMHttpHookListener {
 	private SIPCommander cmder;
 
 	@Autowired
-	private SIPCommanderFroPlatform commanderFroPlatform;
+	private ISIPCommanderForPlatform commanderFroPlatform;
+
+	@Autowired
+	private AudioBroadcastManager audioBroadcastManager;
+
+	@Autowired
+	private ZLMRTPServerFactory zlmrtpServerFactory;
 
 	@Autowired
 	private IPlayService playService;
@@ -466,7 +476,127 @@ public class ZLMHttpHookListener {
 								streamInfo.getStream(), null);
 					}
 				}
-			}else {
+			}else if ("broadcast".equals(app)){
+				// 语音对讲推流  stream需要满足格式deviceId_channelId
+				if (regist && stream.indexOf("_") > 0) {
+					String[] streamArray = stream.split("_");
+					if (streamArray.length == 2) {
+						String deviceId = streamArray[0];
+						String channelId = streamArray[1];
+						Device device = deviceService.queryDevice(deviceId);
+						if (device != null) {
+							DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
+							if (deviceChannel != null) {
+								if (audioBroadcastManager.exit(deviceId, channelId)) {
+									// 直接推流
+									SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(null, null, stream, null);
+									if (sendRtpItem == null) {
+										// TODO 可能数据错误,重新开启语音通道
+									}else {
+										String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
+										MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+										logger.info("rtp/{}开始向上级推流, 目标={}:{},SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc());
+										Map<String, Object> param = new HashMap<>(12);
+										param.put("vhost","__defaultVhost__");
+										param.put("app",sendRtpItem.getApp());
+										param.put("stream",sendRtpItem.getStreamId());
+										param.put("ssrc", sendRtpItem.getSsrc());
+										param.put("src_port", sendRtpItem.getLocalPort());
+										param.put("pt", sendRtpItem.getPt());
+										param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
+										param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
+
+										JSONObject jsonObject;
+										if (sendRtpItem.isTcpActive()) {
+											jsonObject = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
+										} else {
+											param.put("is_udp", is_Udp);
+											param.put("dst_url", sendRtpItem.getIp());
+											param.put("dst_port", sendRtpItem.getPort());
+											jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
+										}
+										if (jsonObject != null && jsonObject.getInteger("code") == 0) {
+											logger.info("[语音对讲] 自动推流成功, device: {}, channel: {}", deviceId, channelId);
+										}
+
+									}
+								}else {
+									// 开启语音对讲通道
+									try {
+										playService.audioBroadcastCmd(device, channelId, 60, (msg)->{
+											logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
+										});
+									} catch (InvalidArgumentException | ParseException | SipException e) {
+										logger.error("[命令发送失败] 语音对讲: {}", e.getMessage());
+									}
+								}
+
+							}
+						}
+					}
+				}
+
+			}else if ("talk".equals(app)){
+				// 语音对讲推流  stream需要满足格式deviceId_channelId
+				if (regist && stream.indexOf("_") > 0) {
+					String[] streamArray = stream.split("_");
+					if (streamArray.length == 2) {
+						String deviceId = streamArray[0];
+						String channelId = streamArray[1];
+						Device device = deviceService.queryDevice(deviceId);
+						if (device != null) {
+							DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
+							if (deviceChannel != null) {
+								if (audioBroadcastManager.exit(deviceId, channelId)) {
+									// 直接推流
+									SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(null, null, stream, null);
+									if (sendRtpItem == null) {
+										// TODO 可能数据错误,重新开启语音通道
+									}else {
+										String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
+										MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+										logger.info("rtp/{}开始向上级推流, 目标={}:{},SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc());
+										Map<String, Object> param = new HashMap<>(12);
+										param.put("vhost","__defaultVhost__");
+										param.put("app",sendRtpItem.getApp());
+										param.put("stream",sendRtpItem.getStreamId());
+										param.put("ssrc", sendRtpItem.getSsrc());
+										param.put("src_port", sendRtpItem.getLocalPort());
+										param.put("pt", sendRtpItem.getPt());
+										param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
+										param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
+
+										JSONObject jsonObject;
+										if (sendRtpItem.isTcpActive()) {
+											jsonObject = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
+										} else {
+											param.put("is_udp", is_Udp);
+											param.put("dst_url", sendRtpItem.getIp());
+											param.put("dst_port", sendRtpItem.getPort());
+											jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
+										}
+										if (jsonObject != null && jsonObject.getInteger("code") == 0) {
+											logger.info("[语音对讲] 自动推流成功, device: {}, channel: {}", deviceId, channelId);
+										}
+									}
+								}else {
+									// 开启语音对讲通道
+									MediaServerItem mediaServerForMinimumLoad = mediaServerService.getMediaServerForMinimumLoad();
+									playService.talk(mediaServerForMinimumLoad, device, channelId, (mediaServerItem, jsonObject)->{
+										System.out.println("开始推流");
+									}, eventResult -> {
+										System.out.println(eventResult.msg);
+									}, ()->{
+										System.out.println("超时");
+									});
+								}
+
+							}
+						}
+					}
+				}
+
+			}else{
 				if (!"rtp".equals(app)){
 					String type = OriginType.values()[item.getOriginType()].getType();
 					MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
@@ -521,10 +651,23 @@ public class ZLMHttpHookListener {
 						if (sendRtpItem.getApp().equals(app)) {
 							String platformId = sendRtpItem.getPlatformId();
 							ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
+							Device device = deviceService.queryDevice(platformId);
 
 							try {
-								commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
-							} catch (SipException | InvalidArgumentException | ParseException e) {
+								if (platform != null) {
+									commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
+								}else {
+									if (sendRtpItem.isOnlyAudio()) {
+										AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+										if (device != null && audioBroadcastCatch != null) {
+//											cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
+										}
+									}else {
+										cmder.streamByeCmd(device, sendRtpItem.getChannelId(), stream, sendRtpItem.getCallId());
+									}
+
+								}
+							} catch (SipException | InvalidArgumentException | ParseException | SsrcTransactionNotFoundException e) {
 								logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
 							}
 						}

+ 0 - 27
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java

@@ -164,33 +164,6 @@ public class ZLMRTPServerFactory {
         return result;
     }
 
-//    private int getPortFromportRange(MediaServerItem mediaServerItem) {
-//        int currentPort = mediaServerItem.getCurrentPort();
-//        if (currentPort == 0) {
-//            String[] portRangeStrArray = mediaServerItem.getSendRtpPortRange().split(",");
-//            if (portRangeStrArray.length != 2) {
-//                portRangeArray[0] = 30000;
-//                portRangeArray[1] = 30500;
-//            }else {
-//                portRangeArray[0] = Integer.parseInt(portRangeStrArray[0]);
-//                portRangeArray[1] = Integer.parseInt(portRangeStrArray[1]);
-//            }
-//        }
-//
-//        if (currentPort == 0 || currentPort++ > portRangeArray[1]) {
-//            currentPort = portRangeArray[0];
-//            mediaServerItem.setCurrentPort(currentPort);
-//            return portRangeArray[0];
-//        } else {
-//            if (currentPort % 2 == 1) {
-//                currentPort++;
-//            }
-//            currentPort++;
-//            mediaServerItem.setCurrentPort(currentPort);
-//            return currentPort;
-//        }
-//    }
-
     /**
      * 创建一个国标推流
      * @param ip 推流ip

+ 2 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/ZlmHttpHookSubscribe.java

@@ -38,6 +38,7 @@ public class ZlmHttpHookSubscribe {
             hookSubscribe.setExpires(expiresInstant);
         }
         allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event);
+        System.out.println(allSubscribes);
     }
 
     public ZlmHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) {
@@ -48,6 +49,7 @@ public class ZlmHttpHookSubscribe {
         }
         for (IHookSubscribe key : eventMap.keySet()) {
             Boolean result = null;
+
             for (String s : key.getContent().keySet()) {
                 if (result == null) {
                     result = key.getContent().getString(s).equals(hookResponse.getString(s));

+ 16 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java

@@ -24,10 +24,26 @@ public class HookSubscribeFactory {
         return hookSubscribe;
     }
 
+    public static HookSubscribeForStreamPush on_publish(String app, String stream, String scheam, String mediaServerId) {
+        HookSubscribeForStreamPush hookSubscribe = new HookSubscribeForStreamPush();
+        JSONObject subscribeKey = new com.alibaba.fastjson.JSONObject();
+        subscribeKey.put("app", app);
+        subscribeKey.put("stream", stream);
+        if (scheam != null) {
+            subscribeKey.put("schema", scheam);
+        }
+        subscribeKey.put("mediaServerId", mediaServerId);
+        hookSubscribe.setContent(subscribeKey);
+
+        return hookSubscribe;
+    }
+
+
     public static HookSubscribeForServerStarted on_server_started() {
         HookSubscribeForServerStarted hookSubscribe = new HookSubscribeForServerStarted();
         hookSubscribe.setContent(new JSONObject());
 
         return hookSubscribe;
     }
+
 }

+ 42 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamPush.java

@@ -0,0 +1,42 @@
+package com.genersoft.iot.vmp.media.zlm.dto;
+
+import com.alibaba.fastjson.JSONObject;
+
+import java.time.Instant;
+
+/**
+ * hook订阅-开始推流
+ * @author lin
+ */
+public class HookSubscribeForStreamPush implements IHookSubscribe{
+
+    private HookType hookType = HookType.on_publish;
+
+    private JSONObject content;
+
+    private Instant expires;
+
+    @Override
+    public HookType getHookType() {
+        return hookType;
+    }
+
+    @Override
+    public JSONObject getContent() {
+        return content;
+    }
+
+    public void setContent(JSONObject content) {
+        this.content = content;
+    }
+
+    @Override
+    public Instant getExpires() {
+        return expires;
+    }
+
+    @Override
+    public void setExpires(Instant expires) {
+        this.expires = expires;
+    }
+}

+ 12 - 1
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java

@@ -11,11 +11,16 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
 import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
+import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
 import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
 import org.springframework.web.context.request.async.DeferredResult;
 
+import javax.sip.InvalidArgumentException;
+import javax.sip.SipException;
+import java.text.ParseException;
+
 /**
  * 点播处理
  */
@@ -23,6 +28,10 @@ public interface IPlayService {
 
     void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid);
 
+    void talk(MediaServerItem mediaServerItem, Device device, String channelId,
+              ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
+              Runnable timeoutCallback);
+
     void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
               ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
               InviteTimeOutCallback timeoutCallback, String uuid);
@@ -44,6 +53,8 @@ public interface IPlayService {
 
     void zlmServerOnline(String mediaServerId);
 
-    void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event);
+    AudioBroadcastResult audioBroadcast(Device device, String channelId);
     void stopAudioBroadcast(String deviceId, String channelId);
+
+    void audioBroadcastCmd(Device device, String channelId, int timeout, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException;
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java

@@ -69,7 +69,7 @@ public class MediaServiceImpl implements IMediaService {
                 JSONObject mediaJSON = JSON.parseObject(JSON.toJSONString(data.get(0)), JSONObject.class);
                 JSONArray tracks = mediaJSON.getJSONArray("tracks");
                 if (authority) {
-                    streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr, calld);
+                    streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr, calld, true);
                 }else {
                     streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr,null, true);
                 }

+ 281 - 105
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java

@@ -12,8 +12,10 @@ import javax.sip.SipException;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
-import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
+import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import com.genersoft.iot.vmp.service.IDeviceService;
+import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +38,6 @@ import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
@@ -54,33 +55,10 @@ import com.genersoft.iot.vmp.service.bean.PlayBackResult;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import com.genersoft.iot.vmp.utils.redis.RedisUtil;
-import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
-import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
 import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
 
-import gov.nist.javax.sip.stack.SIPDialog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.stereotype.Service;
-import org.springframework.util.ResourceUtils;
-import org.springframework.web.context.request.async.DeferredResult;
-
-import javax.sip.ResponseEvent;
-import javax.sip.SipException;
-import java.io.FileNotFoundException;
-import java.math.BigDecimal;
-import java.text.ParseException;
-import java.math.RoundingMode;
-import java.util.*;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
 @SuppressWarnings(value = {"rawtypes", "unchecked"})
 @Service
 public class PlayServiceImpl implements IPlayService {
@@ -97,7 +75,10 @@ public class PlayServiceImpl implements IPlayService {
     private AudioBroadcastManager audioBroadcastManager;
 
     @Autowired
-    private SIPCommanderFroPlatform sipCommanderFroPlatform;
+    private IDeviceService deviceService;
+
+    @Autowired
+    private ISIPCommanderForPlatform sipCommanderFroPlatform;
 
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
@@ -123,10 +104,6 @@ public class PlayServiceImpl implements IPlayService {
     @Autowired
     private VideoStreamSessionManager streamSession;
 
-
-    @Autowired
-    private IDeviceService deviceService;
-
     @Autowired
     private UserSetting userSetting;
 
@@ -145,7 +122,6 @@ public class PlayServiceImpl implements IPlayService {
     private ThreadPoolTaskExecutor taskExecutor;
 
 
-
     @Override
     public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
                            ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
@@ -169,15 +145,15 @@ public class PlayServiceImpl implements IPlayService {
         StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
         playResult.setDevice(device);
 
-        result.onCompletion(()->{
+        result.onCompletion(() -> {
             // 点播结束时调用截图接口
-            taskExecutor.execute(()->{
+            taskExecutor.execute(() -> {
                 // TODO 应该在上流时调用更好,结束也可能是错误结束
-                String path =  "snap";
-                String fileName =  deviceId + "_" + channelId + ".jpg";
-                WVPResult wvpResult =  (WVPResult)result.getResult();
+                String path = "snap";
+                String fileName = deviceId + "_" + channelId + ".jpg";
+                WVPResult wvpResult = (WVPResult) result.getResult();
                 if (Objects.requireNonNull(wvpResult).getCode() == 0) {
-                    StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
+                    StreamInfo streamInfoForSuccess = (StreamInfo) wvpResult.getData();
                     MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
                     String streamUrl = streamInfoForSuccess.getFmp4();
                     // 请求截图
@@ -201,7 +177,7 @@ public class PlayServiceImpl implements IPlayService {
             MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
 
             JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
-            if(rtpInfo.getInteger("code") == 0){
+            if (rtpInfo.getInteger("code") == 0) {
                 if (rtpInfo.getBoolean("exist")) {
                     int localPort = rtpInfo.getInteger("local_port");
                     if (localPort == 0) {
@@ -214,7 +190,7 @@ public class PlayServiceImpl implements IPlayService {
 
                         resultHolder.invokeAllResult(msg);
                         return playResult;
-                    }else {
+                    } else {
                         WVPResult wvpResult = new WVPResult();
                         wvpResult.setCode(ErrorCode.SUCCESS.getCode());
                         wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
@@ -227,12 +203,12 @@ public class PlayServiceImpl implements IPlayService {
                         }
                     }
 
-                }else {
+                } else {
                     redisCatchStorage.stopPlay(streamInfo);
                     storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
                     streamInfo = null;
                 }
-            }else {
+            } else {
                 //zlm连接失败
                 redisCatchStorage.stopPlay(streamInfo);
                 storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
@@ -246,7 +222,7 @@ public class PlayServiceImpl implements IPlayService {
             }
             SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
             logger.info(JSONObject.toJSONString(ssrcInfo));
-            play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
+            play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response) -> {
                 if (hookEvent != null) {
                     hookEvent.response(mediaServerItem, response);
                 }
@@ -260,13 +236,13 @@ public class PlayServiceImpl implements IPlayService {
                 if (errorEvent != null) {
                     errorEvent.response(event);
                 }
-            }, (code, msgStr)->{
+            }, (code, msgStr) -> {
                 // invite点播超时
                 WVPResult wvpResult = new WVPResult();
                 wvpResult.setCode(ErrorCode.ERROR100.getCode());
                 if (code == 0) {
                     wvpResult.setMsg("点播超时,请稍候重试");
-                }else if (code == 1) {
+                } else if (code == 1) {
                     wvpResult.setMsg("收流超时,请稍候重试");
                 }
                 msg.setData(wvpResult);
@@ -277,6 +253,186 @@ public class PlayServiceImpl implements IPlayService {
         return playResult;
     }
 
+    @Override
+    public void talk(MediaServerItem mediaServerItem, Device device, String channelId,
+                     ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
+                     Runnable timeoutCallback) {
+        String streamId = null;
+        if (mediaServerItem.isRtpEnable()) {
+            streamId = String.format("%s_%s", device.getDeviceId(), channelId);
+        }
+        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
+        logger.info("[对讲开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
+        // 超时处理
+        String timeOutTaskKey = UUID.randomUUID().toString();
+        SSRCInfo finalSsrcInfo = ssrcInfo;
+        System.out.println("设置超时任务: " + timeOutTaskKey);
+        dynamicTask.startDelay(timeOutTaskKey, () -> {
+
+            logger.info("[对讲超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, finalSsrcInfo.getPort(), finalSsrcInfo.getSsrc());
+            timeoutCallback.run();
+            // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
+            try {
+                cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null);
+            } catch (InvalidArgumentException | ParseException | SipException e) {
+                logger.error("[对讲超时], 发送BYE失败 {}", e.getMessage());
+            } catch (SsrcTransactionNotFoundException e) {
+                timeoutCallback.run();
+                mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
+                mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
+                streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+            }
+        }, userSetting.getPlayTimeout());
+        final String ssrc = ssrcInfo.getSsrc();
+        final String stream = ssrcInfo.getStream();
+        //端口获取失败的ssrcInfo 没有必要发送点播指令
+        if (ssrcInfo.getPort() <= 0) {
+            logger.info("[对讲] 端口分配异常,deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
+            return;
+        }
+        try {
+            String callId = SipUtils.getNewCallId();
+            cmder.talkStreamCmd(mediaServerItem, ssrcInfo, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
+                logger.info("[对讲] 流已生成, 开始推流: " + response.toJSONString());
+                dynamicTask.stop(timeOutTaskKey);
+                // TODO 暂不做处理
+            }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> {
+                logger.info("[对讲] 开始推流: " + json.toJSONString());
+                dynamicTask.stop(timeOutTaskKey);
+                // 获取远程IP端口 作为回复语音流的地址
+                String ip = json.getString("ip");
+                Integer port = json.getInteger("port");
+                logger.info("[远端设备开始推流]{}/{}, 来自ip:{}, 端口:{}", device.getDeviceId(), channelId, ip, port);
+                // 查看平台推流是否就绪
+                Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemInuse, "talk", stream);
+                if (!ready) {
+                    try {
+                        cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null);
+                    } catch (InvalidArgumentException | ParseException | SipException e) {
+                        logger.error("[对讲超时], 发送BYE失败 {}", e.getMessage());
+                    } catch (SsrcTransactionNotFoundException e) {
+                        timeoutCallback.run();
+                        mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
+                        mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
+                        streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+                    }
+                }else {
+                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, ip, port, ssrcInfo.getSsrc(), device.getDeviceId(),
+                            device.getDeviceId(), channelId,
+                            false);
+
+                    sendRtpItem.setTcpActive(false);
+                    if (sendRtpItem == null || sendRtpItem.getLocalPort() == 0) {
+                        logger.warn("服务器端口资源不足");
+                        try {
+                            cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null);
+                        } catch (InvalidArgumentException | ParseException | SipException e) {
+                            logger.error("[对讲超时], 发送BYE失败 {}", e.getMessage());
+                        } catch (SsrcTransactionNotFoundException e) {
+                            timeoutCallback.run();
+                            mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
+                            mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
+                            streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+                        }
+                        return;
+                    }
+                    sendRtpItem.setCallId(callId);
+                    sendRtpItem.setPlayType(InviteStreamType.TALK);
+                    sendRtpItem.setStatus(1);
+                    sendRtpItem.setIp(ip);
+                    sendRtpItem.setPort(port);
+                    sendRtpItem.setTcpActive(false);
+                    sendRtpItem.setStreamId(ssrcInfo.getStream());
+                    sendRtpItem.setApp("talk");
+                    sendRtpItem.setSsrc(ssrc);
+                    redisCatchStorage.updateSendRTPSever(sendRtpItem);
+
+                    Map<String, Object> param = new HashMap<>(12);
+                    param.put("vhost","__defaultVhost__");
+                    param.put("app",sendRtpItem.getApp());
+                    param.put("stream",sendRtpItem.getStreamId());
+                    param.put("ssrc", sendRtpItem.getSsrc());
+                    param.put("src_port", sendRtpItem.getLocalPort());
+                    param.put("pt", sendRtpItem.getPt());
+                    param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
+                    param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
+                    JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaServerItemInuse, param);
+                    System.out.println(11111);
+                    System.out.println(jsonObject);
+                }
+
+            }, (event) -> {
+//                ResponseEvent responseEvent = (ResponseEvent) event.event;
+//                String contentString = new String(responseEvent.getResponse().getRawContent());
+//                // 获取ssrc
+//                int ssrcIndex = contentString.indexOf("y=");
+//                // 检查是否有y字段
+//                if (ssrcIndex >= 0) {
+//                    //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
+//                    String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
+//                    // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
+//                    if (ssrc.equals(ssrcInResponse)) {
+//                        return;
+//                    }
+//                    logger.info("[对讲消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
+//                    if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
+//                        logger.info("[对讲消息] SSRC修正 {}->{}", ssrc, ssrcInResponse);
+//
+//                        if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
+//                            // ssrc 不可用
+//                            // 释放ssrc
+//                            mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
+//                            streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+//                            event.msg = "下级自定义了ssrc,但是此ssrc不可用";
+//                            event.statusCode = 400;
+//                            errorEvent.response(event);
+//                            return;
+//                        }
+//
+//                        // 单端口模式streamId也有变化,需要重新设置监听
+//                        if (!mediaServerItem.isRtpEnable()) {
+//                            // 添加订阅
+//                            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
+//                            subscribe.removeSubscribe(hookSubscribe);
+//                            hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
+//                            subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
+//                                logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
+//                                dynamicTask.stop(timeOutTaskKey);
+//                                // hook响应
+//                                onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
+//                                hookEvent.response(mediaServerItemInUse, response);
+//                            });
+//                        }
+//                        // 关闭rtp server
+//                        mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
+//                        // 重新开启ssrc server
+//                        mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, finalSsrcInfo.getPort());
+//
+//                    }
+//                }
+            }, (event) -> {
+                dynamicTask.stop(timeOutTaskKey);
+                mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
+                // 释放ssrc
+                mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
+
+                streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+                errorEvent.response(event);
+            });
+        } catch (InvalidArgumentException | SipException | ParseException e) {
+
+            logger.error("[命令发送失败] 对讲消息: {}", e.getMessage());
+            dynamicTask.stop(timeOutTaskKey);
+            mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
+            // 释放ssrc
+            mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
+
+            streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+            SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null));
+            eventResult.msg = "命令发送失败";
+            errorEvent.response(eventResult);
+        }
+    }
 
 
     @Override
@@ -291,12 +447,12 @@ public class PlayServiceImpl implements IPlayService {
         if (ssrcInfo == null) {
             ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
         }
-        logger.info("[点播开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck() );
+        logger.info("[点播开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
         // 超时处理
         String timeOutTaskKey = UUID.randomUUID().toString();
         SSRCInfo finalSsrcInfo = ssrcInfo;
         System.out.println("设置超时任务: " + timeOutTaskKey);
-        dynamicTask.startDelay( timeOutTaskKey,()->{
+        dynamicTask.startDelay(timeOutTaskKey, () -> {
 
             logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, finalSsrcInfo.getPort(), finalSsrcInfo.getSsrc());
             timeoutCallback.run(1, "收流超时");
@@ -315,7 +471,7 @@ public class PlayServiceImpl implements IPlayService {
         final String ssrc = ssrcInfo.getSsrc();
         final String stream = ssrcInfo.getStream();
         //端口获取失败的ssrcInfo 没有必要发送点播指令
-        if(ssrcInfo.getPort() <= 0){
+        if (ssrcInfo.getPort() <= 0) {
             logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
             return;
         }
@@ -330,7 +486,7 @@ public class PlayServiceImpl implements IPlayService {
                 logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
 
             }, (event) -> {
-                ResponseEvent responseEvent = (ResponseEvent)event.event;
+                ResponseEvent responseEvent = (ResponseEvent) event.event;
                 String contentString = new String(responseEvent.getResponse().getRawContent());
                 // 获取ssrc
                 int ssrcIndex = contentString.indexOf("y=");
@@ -342,7 +498,7 @@ public class PlayServiceImpl implements IPlayService {
                     if (ssrc.equals(ssrcInResponse)) {
                         return;
                     }
-                    logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse );
+                    logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
                     if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
                         logger.info("[点播消息] SSRC修正 {}->{}", ssrc, ssrcInResponse);
 
@@ -363,13 +519,13 @@ public class PlayServiceImpl implements IPlayService {
                             HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
                             subscribe.removeSubscribe(hookSubscribe);
                             hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
-                            subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{
-                                        logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
-                                        dynamicTask.stop(timeOutTaskKey);
-                                        // hook响应
-                                        onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
-                                        hookEvent.response(mediaServerItemInUse, response);
-                                    });
+                            subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
+                                logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
+                                dynamicTask.stop(timeOutTaskKey);
+                                // hook响应
+                                onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
+                                hookEvent.response(mediaServerItemInUse, response);
+                            });
                         }
                         // 关闭rtp server
                         mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
@@ -441,7 +597,7 @@ public class PlayServiceImpl implements IPlayService {
         MediaServerItem mediaServerItem;
         if (mediaServerId == null) {
             mediaServerItem = mediaServerService.getMediaServerForMinimumLoad();
-        }else {
+        } else {
             mediaServerItem = mediaServerService.getOne(mediaServerId);
         }
         if (mediaServerItem == null) {
@@ -452,8 +608,8 @@ public class PlayServiceImpl implements IPlayService {
 
     @Override
     public DeferredResult<WVPResult<StreamInfo>> playBack(String deviceId, String channelId, String startTime,
-                                                           String endTime,InviteStreamCallback inviteStreamCallback,
-                                                           PlayBackCallback callback) {
+                                                          String endTime, InviteStreamCallback inviteStreamCallback,
+                                                          PlayBackCallback callback) {
         Device device = storager.queryVideoDevice(deviceId);
         if (device == null) {
             return null;
@@ -466,9 +622,9 @@ public class PlayServiceImpl implements IPlayService {
 
     @Override
     public DeferredResult<WVPResult<StreamInfo>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
-                                                           String deviceId, String channelId, String startTime,
-                                                           String endTime, InviteStreamCallback infoCallBack,
-                                                           PlayBackCallback playBackCallback) {
+                                                          String deviceId, String channelId, String startTime,
+                                                          String endTime, InviteStreamCallback infoCallBack,
+                                                          PlayBackCallback playBackCallback) {
         if (mediaServerItem == null || ssrcInfo == null) {
             return null;
         }
@@ -485,7 +641,7 @@ public class PlayServiceImpl implements IPlayService {
         requestMessage.setKey(key);
         PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>();
         String playBackTimeOutTaskKey = UUID.randomUUID().toString();
-        dynamicTask.startDelay(playBackTimeOutTaskKey, ()->{
+        dynamicTask.startDelay(playBackTimeOutTaskKey, () -> {
             logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId));
             playBackResult.setCode(ErrorCode.ERROR100.getCode());
             playBackResult.setMsg("回放超时");
@@ -545,7 +701,7 @@ public class PlayServiceImpl implements IPlayService {
             cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack,
                     hookEvent, eventResult -> {
                         if (eventResult.type == SipSubscribe.EventResultType.response) {
-                            ResponseEvent responseEvent = (ResponseEvent)eventResult.event;
+                            ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
                             String contentString = new String(responseEvent.getResponse().getRawContent());
                             // 获取ssrc
                             int ssrcIndex = contentString.indexOf("y=");
@@ -557,7 +713,7 @@ public class PlayServiceImpl implements IPlayService {
                                 if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
                                     return;
                                 }
-                                logger.info("[回放消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse );
+                                logger.info("[回放消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
                                 if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
                                     logger.info("[回放消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
 
@@ -578,7 +734,7 @@ public class PlayServiceImpl implements IPlayService {
                                         HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
                                         subscribe.removeSubscribe(hookSubscribe);
                                         hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
-                                        subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{
+                                        subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
                                             logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
                                             dynamicTask.stop(playBackTimeOutTaskKey);
                                             // hook响应
@@ -614,7 +770,7 @@ public class PlayServiceImpl implements IPlayService {
         MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
         SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true, true);
 
-        return download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed,infoCallBack, hookCallBack);
+        return download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, infoCallBack, hookCallBack);
     }
 
     @Override
@@ -640,7 +796,7 @@ public class PlayServiceImpl implements IPlayService {
         downloadResult.setData(requestMessage);
 
         String downLoadTimeOutTaskKey = UUID.randomUUID().toString();
-        dynamicTask.startDelay(downLoadTimeOutTaskKey, ()->{
+        dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> {
             logger.warn(String.format("录像下载请求超时,deviceId:%s ,channelId:%s", deviceId, channelId));
             wvpResult.setCode(ErrorCode.ERROR100.getCode());
             wvpResult.setMsg("录像下载请求超时");
@@ -723,15 +879,15 @@ public class PlayServiceImpl implements IPlayService {
 
                     if (duration == 0) {
                         streamInfo.setProgress(0);
-                    }else {
+                    } else {
                         String startTime = streamInfo.getStartTime();
                         String endTime = streamInfo.getEndTime();
                         long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
                         long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
 
-                        BigDecimal currentCount = new BigDecimal(duration/1000);
-                        BigDecimal totalCount = new BigDecimal(end-start);
-                        BigDecimal divide = currentCount.divide(totalCount,2, RoundingMode.HALF_UP);
+                        BigDecimal currentCount = new BigDecimal(duration / 1000);
+                        BigDecimal totalCount = new BigDecimal(end - start);
+                        BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
                         double process = divide.doubleValue();
                         streamInfo.setProgress(process);
                     }
@@ -762,7 +918,7 @@ public class PlayServiceImpl implements IPlayService {
     public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {
         String streamId = resonse.getString("stream");
         JSONArray tracks = resonse.getJSONArray("tracks");
-        StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem,"rtp", streamId, tracks, null);
+        StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", streamId, tracks, null);
         streamInfo.setDeviceID(deviceId);
         streamInfo.setChannelId(channelId);
         return streamInfo;
@@ -788,7 +944,7 @@ public class PlayServiceImpl implements IPlayService {
         List<SsrcTransaction> allSsrc = streamSession.getAllSsrc();
         if (allSsrc.size() > 0) {
             for (SsrcTransaction ssrcTransaction : allSsrc) {
-                if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
+                if (ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
                     Device device = deviceService.queryDevice(ssrcTransaction.getDeviceId());
                     if (device == null) {
                         continue;
@@ -806,10 +962,36 @@ public class PlayServiceImpl implements IPlayService {
     }
 
     @Override
-    public void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event) {
+    public AudioBroadcastResult audioBroadcast(Device device, String channelId) {
+        if (device == null || channelId == null) {
+            return null;
+        }
+        logger.info("[语音喊话] device: {}, channel: {}", device.getDeviceId(), channelId);
+        DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId);
+        if (deviceChannel == null) {
+            logger.warn("开启语音广播的时候未找到通道: {}", channelId);
+            return null;
+        }
+        MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad();
+//        String app = "broadcast";
+        // TODO 从sip user agent中判断是什么品牌设备,大华默认使用talk模式,其他使用broadcast模式
+        String app = "talk";
+        String stream = device.getDeviceId() + "_" + channelId;
+        StreamInfo broadcast = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "broadcast", stream, null, null, null, false);
+        AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult();
+        audioBroadcastResult.setApp(app);
+        audioBroadcastResult.setStream(stream);
+        audioBroadcastResult.setStreamInfo(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null,false));
+        audioBroadcastResult.setCodec("G.711");
+        return audioBroadcastResult;
+    }
+
+    @Override
+    public void audioBroadcastCmd(Device device, String channelId, int timeout, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException {
         if (device == null || channelId == null) {
             return;
         }
+        logger.info("[语音喊话] device: {}, channel: {}", device.getDeviceId(), channelId);
         DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId);
         if (deviceChannel == null) {
             logger.warn("开启语音广播的时候未找到通道: {}", channelId);
@@ -818,7 +1000,7 @@ public class PlayServiceImpl implements IPlayService {
         }
         // 查询通道使用状态
         if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
-            SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
+            SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
             if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                 // 查询流是否存在,不存在则认为是异常状态
                 MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
@@ -827,8 +1009,8 @@ public class PlayServiceImpl implements IPlayService {
                     logger.warn("语音广播已经开启: {}", channelId);
                     event.call("语音广播已经开启");
                     return;
-                }else {
-                    audioBroadcastManager.del(deviceChannel.getDeviceId(),channelId);
+                } else {
+                    audioBroadcastManager.del(deviceChannel.getDeviceId(), channelId);
                     redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId, sendRtpItem.getCallId(), sendRtpItem.getStreamId());
                 }
             }
@@ -847,39 +1029,33 @@ public class PlayServiceImpl implements IPlayService {
         });
     }
 
+
+
     @Override
-    public void stopAudioBroadcast(String deviceId, String channelId){
+    public void stopAudioBroadcast(String deviceId, String channelId) {
         AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(deviceId, channelId);
         if (audioBroadcastCatch != null) {
 
-            try {
-                SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
-                if (sendRtpItem != null) {
-                    redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
-                    MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-                    Map<String, Object> param = new HashMap<>();
-                    param.put("vhost", "__defaultVhost__");
-                    param.put("app", sendRtpItem.getApp());
-                    param.put("stream", sendRtpItem.getStreamId());
-                    zlmresTfulUtils.stopSendRtp(mediaInfo, param);
-                    // 立刻结束设备的推流,等待自行结束太慢
-                    zlmresTfulUtils.closeStreams(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStreamId());
-                }
-                if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) {
-                    cmder.streamByeCmd(audioBroadcastCatch.getDialog(), audioBroadcastCatch.getChannelId(), audioBroadcastCatch.getRequest(), null);
-                }
-                audioBroadcastManager.del(deviceId, channelId);
-
-            } catch (SipException e) {
-                throw new RuntimeException(e);
-            } catch (ParseException e) {
-                throw new RuntimeException(e);
-            } catch (InvalidArgumentException e) {
-                throw new RuntimeException(e);
+            Device device = deviceService.queryDevice(deviceId);
+            if (device == null) {
+                return;
+            }
+//            if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) {
+//                cmder.streamByeCmd(device, audioBroadcastCatch.getChannelId(), null, audioBroadcastCatch.getSipTransactionInfo().getCallId());
+//            }
+            SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
+            if (sendRtpItem != null) {
+                redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
+                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+                Map<String, Object> param = new HashMap<>();
+                param.put("vhost", "__defaultVhost__");
+                param.put("app", sendRtpItem.getApp());
+                param.put("stream", sendRtpItem.getStreamId());
+                zlmresTfulUtils.stopSendRtp(mediaInfo, param);
             }
-        }
-
 
+            audioBroadcastManager.del(deviceId, channelId);
+        }
     }
 
     @Override

+ 13 - 52
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java

@@ -216,65 +216,20 @@ public class PlayController {
 	@Parameter(name = "timeout", description = "推流超时时间(秒)", required = true)
 	@GetMapping("/broadcast/{deviceId}/{channelId}")
 	@PostMapping("/broadcast/{deviceId}/{channelId}")
-    public DeferredResult<String> broadcastApi(@PathVariable String deviceId, @PathVariable String channelId, Integer timeout) {
+    public AudioBroadcastResult broadcastApi(@PathVariable String deviceId, @PathVariable String channelId, Integer timeout) {
         if (logger.isDebugEnabled()) {
             logger.debug("语音广播API调用");
         }
         Device device = storager.queryVideoDevice(deviceId);
-		DeferredResult<String> result = new DeferredResult<>(3 * 1000L);
 		if (device == null) {
-			result.setResult("未找到设备: " + deviceId);
-			return result;
+			throw new ControllerException(ErrorCode.ERROR400.getCode(), "未找到设备: " + deviceId);
 		}
 		if (channelId == null) {
-			result.setResult("未找到通道: " + channelId);
-			return result;
+			throw new ControllerException(ErrorCode.ERROR400.getCode(), "未找到通道: " + channelId);
 		}
-		String key  = DeferredResultHolder.CALLBACK_CMD_BROADCAST + deviceId;
-		if (resultHolder.exist(key, null)) {
-			result.setResult("设备使用中");
-			return result;
-		}
-		if (timeout == null){
-			timeout = 30;
-		}
-		String uuid  = UUID.randomUUID().toString();
-
-		result.onTimeout(() -> {
-			logger.warn("语音广播操作超时, 设备未返回应答指令");
-			RequestMessage msg = new RequestMessage();
-			msg.setKey(key);
-			msg.setId(uuid);
-			JSONObject json = new JSONObject();
-			json.put("DeviceID", deviceId);
-			json.put("CmdType", "Broadcast");
-			json.put("Result", "Failed");
-			json.put("Error", "Timeout. Device did not response to broadcast command.");
-			msg.setData(json);
-			resultHolder.invokeResult(msg);
-		});
-
-		result.onTimeout(()->{
-			WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>();
-			wvpResult.setCode(-1);
-			wvpResult.setMsg("请求超时");
-			RequestMessage requestMessage = new RequestMessage();
-			requestMessage.setKey(key);
-			requestMessage.setData(wvpResult);
-			resultHolder.invokeAllResult(requestMessage);
-		});
-		playService.audioBroadcast(device, channelId, timeout, (msg)->{
-			WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>();
-			wvpResult.setCode(-1);
-			wvpResult.setMsg(msg);
-			RequestMessage requestMessage = new RequestMessage();
-			requestMessage.setKey(key);
-			requestMessage.setData(wvpResult);
-			resultHolder.invokeAllResult(requestMessage);
-		});
-		resultHolder.put(key, uuid, result);
-
-		return result;
+
+		return playService.audioBroadcast(device, channelId);
+
 	}
 
 
@@ -283,10 +238,16 @@ public class PlayController {
 	@Parameter(name = "channelId", description = "通道Id", required = true)
 	@GetMapping("/broadcast/stop/{deviceId}/{channelId}")
 	@PostMapping("/broadcast/stop/{deviceId}/{channelId}")
-	public void stopBroadcastA(@PathVariable String deviceId, @PathVariable String channelId) {
+	public void stopBroadcast(@PathVariable String deviceId, @PathVariable String channelId) {
 		if (logger.isDebugEnabled()) {
 			logger.debug("停止语音广播API调用");
 		}
+//		try {
+//			playService.stopAudioBroadcast(deviceId, channelId);
+//		} catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException | SipException e) {
+//			logger.error("[命令发送失败] 停止语音: {}", e.getMessage());
+//			throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " +  e.getMessage());
+//		}
 		playService.stopAudioBroadcast(deviceId, channelId);
 	}
 

+ 2 - 2
web_src/config/index.js

@@ -12,14 +12,14 @@ module.exports = {
     assetsPublicPath: '/',
     proxyTable: {
       '/debug': {
-        target: 'http://localhost:38080',
+        target: 'https://default.wvp-pro.cn:18080',
         changeOrigin: true,
         pathRewrite: {
           '^/debug': '/'
         }
       },
       '/static/snap': {
-        target: 'http://localhost:38080',
+        target: 'https://default.wvp-pro.cn:18080',
         changeOrigin: true,
         // pathRewrite: {
         //   '^/static/snap': '/static/snap'

+ 36 - 25
web_src/src/components/dialog/devicePlayer.vue

@@ -1,7 +1,7 @@
 <template>
 <div id="devicePlayer" v-loading="isLoging">
 
-    <el-dialog title="视频播放" top="0" :close-on-click-modal="false" :visible.sync="showVideoDialog" :destroy-on-close="true" @close="close()">
+    <el-dialog title="视频播放" top="0" :close-on-click-modal="false" :visible.sync="showVideoDialog" @close="close()">
         <!-- <LivePlayer v-if="showVideoDialog" ref="videoPlayer" :videoUrl="videoUrl" :error="videoError" :message="videoError" :hasaudio="hasaudio" fluent autoplay live></LivePlayer> -->
       <div style="width: 100%; height: 100%">
         <el-tabs type="card" :stretch="true" v-model="activePlayer" @tab-click="changePlayer" v-if="Object.keys(this.player).length > 1">
@@ -119,6 +119,10 @@
                                   <el-tag >RTC:</el-tag>
                                   <span>{{ streamInfo.rtc }}</span>
                                 </el-dropdown-item>
+                                <el-dropdown-item :command="streamInfo.rtcs">
+                                  <el-tag >RTCS:</el-tag>
+                                  <span>{{ streamInfo.rtcs }}</span>
+                                </el-dropdown-item>
                                 <el-dropdown-item :command="streamInfo.rtmp">
                                   <el-tag >RTMP:</el-tag>
                                   <span>{{ streamInfo.rtmp }}</span>
@@ -875,7 +879,8 @@ export default {
                 }
               });
             }else if (this.broadcastStatus === 1) {
-              this.stopBroadcast()
+                this.broadcastStatus = -1;
+                this.broadcastRtc.close()
             }
         },
         startBroadcast(url){
@@ -890,6 +895,7 @@ export default {
                 message: "获取推流鉴权Key失败",
                 type: "error",
               });
+              this.broadcastStatus = -1;
             }else {
               let pushKey = res.data.data.pushKey;
               // 获取推流鉴权KEY
@@ -923,6 +929,7 @@ export default {
                   message: '不支持webrtc, 无法进行语音对讲',
                   type: 'error'
                 });
+                this.broadcastStatus = -1;
               });
 
               this.broadcastRtc.on(ZLMRTCClient.Events.WEBRTC_ICE_CANDIDATE_ERROR,(e)=>{// ICE 协商出错
@@ -932,6 +939,7 @@ export default {
                   message: 'ICE 协商出错',
                   type: 'error'
                 });
+                this.broadcastStatus = -1;
               });
 
               this.broadcastRtc.on(ZLMRTCClient.Events.WEBRTC_OFFER_ANWSER_EXCHANGE_FAILED,(e)=>{// offer anwser 交换失败
@@ -941,6 +949,7 @@ export default {
                   message: 'offer anwser 交换失败' + e,
                   type: 'error'
                 });
+                this.broadcastStatus = -1;
               });
               this.broadcastRtc.on(ZLMRTCClient.Events.WEBRTC_ON_CONNECTION_STATE_CHANGE,(e)=>{// offer anwser 交换失败
                 console.log('状态改变',e)
@@ -959,36 +968,38 @@ export default {
                   message: '捕获流失败' + e,
                   type: 'error'
                 });
+                this.broadcastStatus = -1;
               });
             }
+          }).catch((e) => {
+            this.$message({
+              showClose: true,
+              message: e,
+              type: 'error'
+            });
+            this.broadcastStatus = -1;
           });
 
 
         },
         stopBroadcast(){
-          if (this.broadcastStatus === -1) {
-            this.broadcastStatus = 1;
-          }else {
-            this.broadcastStatus = -2;
-            this.broadcastRtc = null;
-            this.$axios({
-              method: 'get',
-              url: '/api/play/broadcast/stop/' + this.deviceId + '/' + this.channelId
-            }).then( (res)=> {
-              if (res.data.code == 0) {
-                // this.broadcastStatus = -1;
-                // this.broadcastRtc.close()
-              }else {
-                this.$message({
-                  showClose: true,
-                  message: res.data.msg,
-                  type: "error",
-                });
-              }
-            });
-          }
-
-
+          this.broadcastRtc.close();
+          this.broadcastStatus = -1;
+          this.$axios({
+            method: 'get',
+            url: '/api/play/broadcast/stop/' + this.deviceId + '/' + this.channelId
+          }).then( (res)=> {
+            if (res.data.code == 0) {
+              // this.broadcastStatus = -1;
+              // this.broadcastRtc.close()
+            }else {
+              this.$message({
+                showClose: true,
+                message: res.data.msg,
+                type: "error",
+              });
+            }
+          });
         }
     }
 };