Parcourir la source

完成向上级联->点播--002

panlinlin il y a 4 ans
Parent
commit
1b44ba3367

+ 6 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java

@@ -15,6 +15,7 @@ import com.alibaba.fastjson.JSON;
 import com.genersoft.iot.vmp.gb28181.transmit.response.impl.*;
 import com.genersoft.iot.vmp.gb28181.transmit.response.impl.*;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
+import com.genersoft.iot.vmp.vmanager.service.IPlayService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -103,6 +104,9 @@ public class SIPProcessorFactory {
 	@Autowired
 	private OtherResponseProcessor otherResponseProcessor;
 
+	@Autowired
+	private IPlayService playService;
+
 
 	// 注:这里使用注解会导致循环依赖注入,暂用springBean
 	private SipProvider tcpSipProvider;
@@ -120,7 +124,9 @@ public class SIPProcessorFactory {
 			processor.setTcpSipProvider(getTcpSipProvider());
 			processor.setUdpSipProvider(getUdpSipProvider());
 
+			processor.setCmder(cmder);
 			processor.setCmderFroPlatform(cmderFroPlatform);
+			processor.setPlayService(playService);
 			processor.setStorager(storager);
 			return processor;
 		} else if (Request.REGISTER.equals(method)) {

+ 72 - 8
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java

@@ -10,19 +10,26 @@ import javax.sip.header.SubjectHeader;
 import javax.sip.message.Request;
 import javax.sip.message.Response;
 
+import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
 import com.genersoft.iot.vmp.gb28181.sdp.Codec;
 import com.genersoft.iot.vmp.gb28181.sdp.MediaDescription;
 import com.genersoft.iot.vmp.gb28181.sdp.SdpParser;
 import com.genersoft.iot.vmp.gb28181.sdp.SessionDescription;
+import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
+import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult;
+import com.genersoft.iot.vmp.vmanager.service.IPlayService;
 import gov.nist.javax.sip.address.AddressImpl;
 import gov.nist.javax.sip.address.SipUri;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 
 import java.io.IOException;
 import java.text.ParseException;
@@ -41,6 +48,10 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
 
 	private IVideoManagerStorager storager;
 
+	private SIPCommander cmder;
+
+	private IPlayService playService;
+
 	/**
 	 * 处理invite请求
 	 * 
@@ -119,7 +130,30 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
 
 
 			String ssrc = sdp.getSsrc();
+
+			Device device = storager.queryVideoDeviceByPlatformIdAndChannelId(platformId, channelId);
+			if (device == null) {
+				logger.warn("点播平台{}的通道{}时未找到设备信息", platformId, channel);
+				response500Ack(evt);
+				return;
+			}
+
 			// 通知下级推流,
+			PlayResult playResult = playService.play(device.getDeviceId(), channelId, (response)->{
+				// 收到推流, 回复200OK
+
+			},(event -> {
+				// 未知错误。直接转发设备点播的错误
+				Response response = null;
+				try {
+					response = getMessageFactory().createResponse(event.getResponse().getStatusCode(), evt.getRequest());
+					getServerTransaction(evt).sendResponse(response);
+
+				} catch (ParseException | SipException | InvalidArgumentException e) {
+					e.printStackTrace();
+				}
+			}));
+			playResult.getResult();
 			// 查找合适的端口推流,
 			// 发送 200ok
 			// 收到ack后调用推流接口
@@ -149,14 +183,16 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
 	}
 
 	/***
-	 * 回复404
+	 * 回复200 OK
 	 * @param evt
 	 * @throws SipException
 	 * @throws InvalidArgumentException
 	 * @throws ParseException
 	 */
-	private void response404Ack(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException {
-		Response response = getMessageFactory().createResponse(Response.NOT_FOUND, evt.getRequest());
+	private void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
+		Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
+		ContentTypeHeader contentTypeHeader = getHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
+		response.setContent(sdp, contentTypeHeader);
 		getServerTransaction(evt).sendResponse(response);
 	}
 
@@ -172,6 +208,18 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
 		getServerTransaction(evt).sendResponse(response);
 	}
 
+	/***
+	 * 回复404
+	 * @param evt
+	 * @throws SipException
+	 * @throws InvalidArgumentException
+	 * @throws ParseException
+	 */
+	private void response404Ack(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException {
+		Response response = getMessageFactory().createResponse(Response.NOT_FOUND, evt.getRequest());
+		getServerTransaction(evt).sendResponse(response);
+	}
+
 	/***
 	 * 回复488
 	 * @param evt
@@ -185,16 +233,14 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
 	}
 
 	/***
-	 * 回复200 OK
+	 * 回复500
 	 * @param evt
 	 * @throws SipException
 	 * @throws InvalidArgumentException
 	 * @throws ParseException
 	 */
-	private void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
-		Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
-		ContentTypeHeader contentTypeHeader = getHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
-		response.setContent(sdp, contentTypeHeader);
+	private void response500Ack(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException {
+		Response response = getMessageFactory().createResponse(Response.SERVER_INTERNAL_ERROR, evt.getRequest());
 		getServerTransaction(evt).sendResponse(response);
 	}
 
@@ -207,6 +253,8 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
 
 
 
+
+
 	public SIPCommanderFroPlatform getCmderFroPlatform() {
 		return cmderFroPlatform;
 	}
@@ -222,4 +270,20 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
 	public void setStorager(IVideoManagerStorager storager) {
 		this.storager = storager;
 	}
+
+	public SIPCommander getCmder() {
+		return cmder;
+	}
+
+	public void setCmder(SIPCommander cmder) {
+		this.cmder = cmder;
+	}
+
+	public IPlayService getPlayService() {
+		return playService;
+	}
+
+	public void setPlayService(IPlayService playService) {
+		this.playService = playService;
+	}
 }

+ 2 - 0
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java

@@ -234,4 +234,6 @@ public interface IVideoManagerStorager {
 
 
     DeviceChannel queryChannelInParentPlatform(String platformId, String channelId);
+
+    Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId);
 }

+ 5 - 0
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java

@@ -335,4 +335,9 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
 		DeviceChannel channel = patformChannelMapper.queryChannelInParentPlatform(platformId, channelId);
 		return channel;
 	}
+
+	@Override
+	public Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId) {
+		return null;
+	}
 }

+ 6 - 48
src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java

@@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult;
 import com.genersoft.iot.vmp.vmanager.service.IPlayService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,62 +65,19 @@ public class PlayController {
 													   @PathVariable String channelId) {
 
 
-		Device device = storager.queryVideoDevice(deviceId);
-		StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
-
-		UUID uuid = UUID.randomUUID();
-		DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>();
-
-		// 录像查询以channelId作为deviceId查询
-		resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result);
-
-		if (streamInfo == null) {
-			// 发送点播消息
-			cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
-				logger.info("收到订阅消息: " + response.toJSONString());
-				playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
-			}, event -> {
-				RequestMessage msg = new RequestMessage();
-				msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
-				Response response = event.getResponse();
-				msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
-				resultHolder.invokeResult(msg);
-			});
-		} else {
-			String streamId = streamInfo.getStreamId();
-			JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(streamId);
-			if (rtpInfo.getBoolean("exist")) {
-				RequestMessage msg = new RequestMessage();
-				msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
-				msg.setData(JSON.toJSONString(streamInfo));
-				resultHolder.invokeResult(msg);
-			} else {
-				redisCatchStorage.stopPlay(streamInfo);
-				storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
-				cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
-					logger.info("收到订阅消息: " + response.toJSONString());
-					playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
-				}, event -> {
-					RequestMessage msg = new RequestMessage();
-					msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
-					Response response = event.getResponse();
-					msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
-					resultHolder.invokeResult(msg);
-				});
-			}
-		}
+		PlayResult playResult = playService.play(deviceId, channelId, null, null);
 
 		// 超时处理
-		result.onTimeout(()->{
+		playResult.getResult().onTimeout(()->{
 			logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
 			// 释放rtpserver
-			cmder.closeRTPServer(device, channelId);
+			cmder.closeRTPServer(playResult.getDevice(), channelId);
 			RequestMessage msg = new RequestMessage();
-			msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
+			msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + playResult.getUuid());
 			msg.setData("Timeout");
 			resultHolder.invokeResult(msg);
 		});
-		return result;
+		return playResult.getResult();
 	}
 
 	@PostMapping("/play/{streamId}/stop")

+ 37 - 0
src/main/java/com/genersoft/iot/vmp/vmanager/play/bean/PlayResult.java

@@ -0,0 +1,37 @@
+package com.genersoft.iot.vmp.vmanager.play.bean;
+
+import com.genersoft.iot.vmp.gb28181.bean.Device;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.context.request.async.DeferredResult;
+
+public class PlayResult {
+
+    private DeferredResult<ResponseEntity<String>> result;
+    private String uuid;
+
+    private Device device;
+
+    public DeferredResult<ResponseEntity<String>> getResult() {
+        return result;
+    }
+
+    public void setResult(DeferredResult<ResponseEntity<String>> result) {
+        this.result = result;
+    }
+
+    public String getUuid() {
+        return uuid;
+    }
+
+    public void setUuid(String uuid) {
+        this.uuid = uuid;
+    }
+
+    public Device getDevice() {
+        return device;
+    }
+
+    public void setDevice(Device device) {
+        this.device = device;
+    }
+}

+ 7 - 0
src/main/java/com/genersoft/iot/vmp/vmanager/service/IPlayService.java

@@ -2,6 +2,11 @@ package com.genersoft.iot.vmp.vmanager.service;
 
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
+import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
+import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.context.request.async.DeferredResult;
 
 /**
  * 点播处理
@@ -10,4 +15,6 @@ public interface IPlayService {
 
     void onPublishHandlerForPlayBack(JSONObject resonse, String deviceId, String channelId, String uuid);
     void onPublishHandlerForPlay(JSONObject resonse, String deviceId, String channelId, String uuid);
+
+    PlayResult play(String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent);
 }

+ 75 - 0
src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java

@@ -4,19 +4,29 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.MediaServerConfig;
+import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
+import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
+import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
 import com.genersoft.iot.vmp.vmanager.play.PlayController;
+import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult;
 import com.genersoft.iot.vmp.vmanager.service.IPlayService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
+import org.springframework.web.context.request.async.DeferredResult;
 
+import javax.sip.message.Response;
 import java.text.DecimalFormat;
+import java.util.UUID;
 
 @Service
 public class PlayServiceImpl implements IPlayService {
@@ -26,12 +36,77 @@ public class PlayServiceImpl implements IPlayService {
     @Autowired
     private IVideoManagerStorager storager;
 
+    @Autowired
+    private SIPCommander cmder;
+
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
 
     @Autowired
     private DeferredResultHolder resultHolder;
 
+    @Autowired
+    private ZLMRESTfulUtils zlmresTfulUtils;
+
+
+    @Override
+    public PlayResult play(String deviceId, String channelId, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent) {
+        PlayResult playResult = new PlayResult();
+        Device device = storager.queryVideoDevice(deviceId);
+        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
+        playResult.setDevice(device);
+        UUID uuid = UUID.randomUUID();
+        playResult.setUuid(uuid.toString());
+        DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>();
+        playResult.setResult(result);
+        // 录像查询以channelId作为deviceId查询
+        resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result);
+
+        if (streamInfo == null) {
+            // 发送点播消息
+            cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
+                logger.info("收到订阅消息: " + response.toJSONString());
+                onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
+                if (hookEvent != null) {
+                    hookEvent.response(response);
+                }
+            }, event -> {
+                RequestMessage msg = new RequestMessage();
+                msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
+                Response response = event.getResponse();
+                msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
+                resultHolder.invokeResult(msg);
+                if (errorEvent != null) {
+                    errorEvent.response(event);
+                }
+            });
+        } else {
+            String streamId = streamInfo.getStreamId();
+            JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(streamId);
+            if (rtpInfo.getBoolean("exist")) {
+                RequestMessage msg = new RequestMessage();
+                msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
+                msg.setData(JSON.toJSONString(streamInfo));
+                resultHolder.invokeResult(msg);
+            } else {
+                redisCatchStorage.stopPlay(streamInfo);
+                storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
+                cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
+                    logger.info("收到订阅消息: " + response.toJSONString());
+                    onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
+                }, event -> {
+                    RequestMessage msg = new RequestMessage();
+                    msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
+                    Response response = event.getResponse();
+                    msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
+                    resultHolder.invokeResult(msg);
+                });
+            }
+        }
+
+        return playResult;
+    }
+
     @Override
     public void onPublishHandlerForPlay(JSONObject resonse, String deviceId, String channelId, String uuid) {
         RequestMessage msg = new RequestMessage();