StreamPushController.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package com.genersoft.iot.vmp.streamPush.controller;
  2. import com.alibaba.excel.EasyExcel;
  3. import com.alibaba.excel.ExcelReader;
  4. import com.alibaba.excel.read.metadata.ReadSheet;
  5. import com.genersoft.iot.vmp.common.StreamInfo;
  6. import com.genersoft.iot.vmp.conf.UserSetting;
  7. import com.genersoft.iot.vmp.conf.exception.ControllerException;
  8. import com.genersoft.iot.vmp.conf.security.JwtUtils;
  9. import com.genersoft.iot.vmp.conf.security.SecurityUtils;
  10. import com.genersoft.iot.vmp.conf.security.dto.LoginUser;
  11. import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
  12. import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
  13. import com.genersoft.iot.vmp.media.service.IMediaServerService;
  14. import com.genersoft.iot.vmp.service.IMediaService;
  15. import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
  16. import com.genersoft.iot.vmp.streamPush.bean.StreamPushExcelDto;
  17. import com.genersoft.iot.vmp.streamPush.enent.StreamPushUploadFileHandler;
  18. import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
  19. import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
  20. import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
  21. import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
  22. import com.github.pagehelper.PageInfo;
  23. import io.swagger.v3.oas.annotations.Operation;
  24. import io.swagger.v3.oas.annotations.Parameter;
  25. import io.swagger.v3.oas.annotations.security.SecurityRequirement;
  26. import io.swagger.v3.oas.annotations.tags.Tag;
  27. import lombok.extern.slf4j.Slf4j;
  28. import org.springframework.beans.factory.annotation.Autowired;
  29. import org.springframework.http.HttpStatus;
  30. import org.springframework.http.ResponseEntity;
  31. import org.springframework.stereotype.Controller;
  32. import org.springframework.util.ObjectUtils;
  33. import org.springframework.web.bind.annotation.*;
  34. import org.springframework.web.context.request.async.DeferredResult;
  35. import org.springframework.web.multipart.MultipartFile;
  36. import java.io.IOException;
  37. import java.io.InputStream;
  38. import java.util.HashMap;
  39. import java.util.List;
  40. import java.util.Map;
  41. import java.util.UUID;
  42. @Tag(name = "推流信息管理")
  43. @Controller
  44. @Slf4j
  45. @RequestMapping(value = "/api/push")
  46. public class StreamPushController {
  47. @Autowired
  48. private IStreamPushService streamPushService;
  49. @Autowired
  50. private IMediaServerService mediaServerService;
  51. @Autowired
  52. private DeferredResultHolder resultHolder;
  53. @Autowired
  54. private IMediaService mediaService;
  55. @Autowired
  56. private UserSetting userSetting;
  57. @GetMapping(value = "/list")
  58. @ResponseBody
  59. @Operation(summary = "推流列表查询", security = @SecurityRequirement(name = JwtUtils.HEADER))
  60. @Parameter(name = "page", description = "当前页")
  61. @Parameter(name = "count", description = "每页查询数量")
  62. @Parameter(name = "query", description = "查询内容")
  63. @Parameter(name = "pushing", description = "是否正在推流")
  64. @Parameter(name = "mediaServerId", description = "流媒体ID")
  65. public PageInfo<StreamPush> list(@RequestParam(required = false)Integer page,
  66. @RequestParam(required = false)Integer count,
  67. @RequestParam(required = false)String query,
  68. @RequestParam(required = false)Boolean pushing,
  69. @RequestParam(required = false)String mediaServerId ){
  70. if (ObjectUtils.isEmpty(query)) {
  71. query = null;
  72. }
  73. if (ObjectUtils.isEmpty(mediaServerId)) {
  74. mediaServerId = null;
  75. }
  76. PageInfo<StreamPush> pushList = streamPushService.getPushList(page, count, query, pushing, mediaServerId);
  77. return pushList;
  78. }
  79. @PostMapping(value = "/stop")
  80. @ResponseBody
  81. @Operation(summary = "中止一个推流", security = @SecurityRequirement(name = JwtUtils.HEADER))
  82. @Parameter(name = "app", description = "应用名", required = true)
  83. @Parameter(name = "stream", description = "流id", required = true)
  84. public void stop(String app, String stream){
  85. if (!streamPushService.stopByAppAndStream(app, stream)){
  86. throw new ControllerException(ErrorCode.ERROR100);
  87. }
  88. }
  89. @PostMapping(value = "upload")
  90. @ResponseBody
  91. public DeferredResult<ResponseEntity<WVPResult<Object>>> uploadChannelFile(@RequestParam(value = "file") MultipartFile file){
  92. // 最多处理文件一个小时
  93. DeferredResult<ResponseEntity<WVPResult<Object>>> result = new DeferredResult<>(60*60*1000L);
  94. // 录像查询以channelId作为deviceId查询
  95. String key = DeferredResultHolder.UPLOAD_FILE_CHANNEL;
  96. String uuid = UUID.randomUUID().toString();
  97. log.info("通道导入文件类型: {}",file.getContentType() );
  98. if (file.isEmpty()) {
  99. log.warn("通道导入文件为空");
  100. WVPResult<Object> wvpResult = new WVPResult<>();
  101. wvpResult.setCode(-1);
  102. wvpResult.setMsg("文件为空");
  103. result.setResult(ResponseEntity.status(HttpStatus.BAD_REQUEST).body(wvpResult));
  104. return result;
  105. }
  106. if (file.getContentType() == null) {
  107. WVPResult<Object> wvpResult = new WVPResult<>();
  108. wvpResult.setCode(-1);
  109. wvpResult.setMsg("无法识别文件类型");
  110. result.setResult(ResponseEntity.status(HttpStatus.BAD_REQUEST).body(wvpResult));
  111. return result;
  112. }
  113. // 同时只处理一个文件
  114. if (resultHolder.exist(key, null)) {
  115. log.warn("已有导入任务正在执行");
  116. WVPResult<Object> wvpResult = new WVPResult<>();
  117. wvpResult.setCode(-1);
  118. wvpResult.setMsg("已有导入任务正在执行");
  119. result.setResult(ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(wvpResult));
  120. return result;
  121. }
  122. resultHolder.put(key, uuid, result);
  123. result.onTimeout(()->{
  124. log.warn("通道导入超时,可能文件过大");
  125. RequestMessage msg = new RequestMessage();
  126. msg.setKey(key);
  127. WVPResult<Object> wvpResult = new WVPResult<>();
  128. wvpResult.setCode(-1);
  129. wvpResult.setMsg("导入超时,可能文件过大");
  130. msg.setData(wvpResult);
  131. resultHolder.invokeAllResult(msg);
  132. });
  133. //获取文件流
  134. InputStream inputStream = null;
  135. try {
  136. String name = file.getName();
  137. inputStream = file.getInputStream();
  138. } catch (IOException e) {
  139. log.error("未处理的异常 ", e);
  140. }
  141. try {
  142. //传入参数
  143. ExcelReader excelReader = EasyExcel.read(inputStream, StreamPushExcelDto.class,
  144. new StreamPushUploadFileHandler(streamPushService, mediaServerService.getDefaultMediaServer().getId(), (errorStreams, errorGBs)->{
  145. log.info("通道导入成功,存在重复App+Stream为{}个,存在国标ID为{}个", errorStreams.size(), errorGBs.size());
  146. RequestMessage msg = new RequestMessage();
  147. msg.setKey(key);
  148. WVPResult<Map<String, List<String>>> wvpResult = new WVPResult<>();
  149. if (errorStreams.isEmpty() && errorGBs.isEmpty()) {
  150. wvpResult.setCode(0);
  151. wvpResult.setMsg("成功");
  152. }else {
  153. wvpResult.setCode(1);
  154. wvpResult.setMsg("导入成功。但是存在重复数据");
  155. Map<String, List<String>> errorData = new HashMap<>();
  156. errorData.put("gbId", errorGBs);
  157. errorData.put("stream", errorStreams);
  158. wvpResult.setData(errorData);
  159. }
  160. msg.setData(wvpResult);
  161. resultHolder.invokeAllResult(msg);
  162. })).build();
  163. ReadSheet readSheet = EasyExcel.readSheet(0).build();
  164. excelReader.read(readSheet);
  165. excelReader.finish();
  166. }catch (Exception e) {
  167. log.warn("通道导入失败:", e);
  168. RequestMessage msg = new RequestMessage();
  169. msg.setKey(key);
  170. WVPResult<Object> wvpResult = new WVPResult<>();
  171. wvpResult.setCode(-1);
  172. wvpResult.setMsg("通道导入失败: " + e.getMessage() );
  173. msg.setData(wvpResult);
  174. resultHolder.invokeAllResult(msg);
  175. }
  176. return result;
  177. }
  178. /**
  179. * 获取推流播放地址
  180. * @param app 应用名
  181. * @param stream 流id
  182. * @return
  183. */
  184. @GetMapping(value = "/getPlayUrl")
  185. @ResponseBody
  186. @Operation(summary = "获取推流播放地址", security = @SecurityRequirement(name = JwtUtils.HEADER))
  187. @Parameter(name = "app", description = "应用名", required = true)
  188. @Parameter(name = "stream", description = "流id", required = true)
  189. @Parameter(name = "mediaServerId", description = "媒体服务器id")
  190. public StreamContent getPlayUrl(@RequestParam String app, @RequestParam String stream,
  191. @RequestParam(required = false) String mediaServerId){
  192. boolean authority = false;
  193. // 是否登陆用户, 登陆用户返回完整信息
  194. LoginUser userInfo = SecurityUtils.getUserInfo();
  195. if (userInfo!= null) {
  196. authority = true;
  197. }
  198. StreamPush push = streamPushService.getPush(app, stream);
  199. if (push != null && !push.isSelf()) {
  200. throw new ControllerException(ErrorCode.ERROR100.getCode(), "来自其他平台的推流信息");
  201. }
  202. StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority);
  203. if (streamInfo == null){
  204. throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取播放地址失败");
  205. }
  206. return new StreamContent(streamInfo);
  207. }
  208. /**
  209. * 添加推流信息
  210. * @param stream 推流信息
  211. * @return
  212. */
  213. @PostMapping(value = "/add")
  214. @ResponseBody
  215. @Operation(summary = "添加推流信息", security = @SecurityRequirement(name = JwtUtils.HEADER))
  216. public void add(@RequestBody StreamPush stream){
  217. if (ObjectUtils.isEmpty(stream.getGbId())) {
  218. throw new ControllerException(ErrorCode.ERROR400.getCode(), "国标ID不可为空");
  219. }
  220. if (ObjectUtils.isEmpty(stream.getApp()) && ObjectUtils.isEmpty(stream.getStream())) {
  221. throw new ControllerException(ErrorCode.ERROR400.getCode(), "app或stream不可为空");
  222. }
  223. stream.setGbStatus("OFF");
  224. stream.setPushIng(false);
  225. if (!streamPushService.add(stream)) {
  226. throw new ControllerException(ErrorCode.ERROR100);
  227. }
  228. }
  229. }