StreamPushController.java 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. package com.genersoft.iot.vmp.vmanager.streamPush;
  2. import com.alibaba.excel.EasyExcel;
  3. import com.alibaba.excel.ExcelReader;
  4. import com.alibaba.excel.read.metadata.ReadSheet;
  5. import com.genersoft.iot.vmp.common.StreamInfo;
  6. import com.genersoft.iot.vmp.conf.UserSetting;
  7. import com.genersoft.iot.vmp.conf.security.SecurityUtils;
  8. import com.genersoft.iot.vmp.conf.security.dto.LoginUser;
  9. import com.genersoft.iot.vmp.gb28181.bean.GbStream;
  10. import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
  11. import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
  12. import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
  13. import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
  14. import com.genersoft.iot.vmp.service.IMediaServerService;
  15. import com.genersoft.iot.vmp.service.IMediaService;
  16. import com.genersoft.iot.vmp.service.IStreamPushService;
  17. import com.genersoft.iot.vmp.service.impl.StreamPushUploadFileHandler;
  18. import com.genersoft.iot.vmp.vmanager.bean.BatchGBStreamParam;
  19. import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
  20. import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
  21. import com.github.pagehelper.PageInfo;
  22. import io.swagger.annotations.Api;
  23. import io.swagger.annotations.ApiImplicitParam;
  24. import io.swagger.annotations.ApiImplicitParams;
  25. import io.swagger.annotations.ApiOperation;
  26. import org.apache.poi.sl.usermodel.Sheet;
  27. import org.slf4j.Logger;
  28. import org.slf4j.LoggerFactory;
  29. import org.springframework.beans.factory.annotation.Autowired;
  30. import org.springframework.http.HttpStatus;
  31. import org.springframework.http.ResponseEntity;
  32. import org.springframework.stereotype.Controller;
  33. import org.springframework.util.StringUtils;
  34. import org.springframework.web.bind.annotation.*;
  35. import org.springframework.web.context.request.async.DeferredResult;
  36. import org.springframework.web.multipart.MultipartFile;
  37. import javax.servlet.http.HttpServletRequest;
  38. import java.io.IOException;
  39. import java.io.InputStream;
  40. import java.util.HashMap;
  41. import java.util.List;
  42. import java.util.Map;
  43. import java.util.UUID;
  44. @Api(tags = "推流信息管理")
  45. @Controller
  46. @CrossOrigin
  47. @RequestMapping(value = "/api/push")
  48. public class StreamPushController {
  49. private final static Logger logger = LoggerFactory.getLogger(StreamPushController.class);
  50. @Autowired
  51. private IStreamPushService streamPushService;
  52. @Autowired
  53. private IMediaServerService mediaServerService;
  54. @Autowired
  55. private DeferredResultHolder resultHolder;
  56. @Autowired
  57. private IMediaService mediaService;
  58. @Autowired
  59. private UserSetting userSetting;
  60. @ApiOperation("推流列表查询")
  61. @ApiImplicitParams({
  62. @ApiImplicitParam(name="page", value = "当前页", required = true, dataTypeClass = Integer.class),
  63. @ApiImplicitParam(name="count", value = "每页查询数量", required = true, dataTypeClass = Integer.class),
  64. @ApiImplicitParam(name="query", value = "查询内容", dataTypeClass = String.class),
  65. @ApiImplicitParam(name="pushing", value = "是否正在推流", dataTypeClass = Boolean.class),
  66. @ApiImplicitParam(name="mediaServerId", value = "流媒体ID", dataTypeClass = String.class),
  67. })
  68. @GetMapping(value = "/list")
  69. @ResponseBody
  70. public PageInfo<StreamPushItem> list(@RequestParam(required = false)Integer page,
  71. @RequestParam(required = false)Integer count,
  72. @RequestParam(required = false)String query,
  73. @RequestParam(required = false)Boolean pushing,
  74. @RequestParam(required = false)String mediaServerId ){
  75. if (StringUtils.isEmpty(query)) {
  76. query = null;
  77. }
  78. if (StringUtils.isEmpty(mediaServerId)) {
  79. mediaServerId = null;
  80. }
  81. PageInfo<StreamPushItem> pushList = streamPushService.getPushList(page, count, query, pushing, mediaServerId);
  82. return pushList;
  83. }
  84. @ApiOperation("将推流添加到国标")
  85. @ApiImplicitParams({
  86. @ApiImplicitParam(name = "stream", value = "直播流关联国标平台", dataTypeClass = GbStream.class),
  87. })
  88. @PostMapping(value = "/save_to_gb")
  89. @ResponseBody
  90. public Object saveToGB(@RequestBody GbStream stream){
  91. if (streamPushService.saveToGB(stream)){
  92. return "success";
  93. }else {
  94. return "fail";
  95. }
  96. }
  97. @ApiOperation("将推流移出到国标")
  98. @ApiImplicitParams({
  99. @ApiImplicitParam(name = "stream", value = "直播流关联国标平台", dataTypeClass = GbStream.class),
  100. })
  101. @DeleteMapping(value = "/remove_form_gb")
  102. @ResponseBody
  103. public Object removeFormGB(@RequestBody GbStream stream){
  104. if (streamPushService.removeFromGB(stream)){
  105. return "success";
  106. }else {
  107. return "fail";
  108. }
  109. }
  110. @ApiOperation("中止一个推流")
  111. @ApiImplicitParams({
  112. @ApiImplicitParam(name = "app", value = "应用名", required = true, dataTypeClass = String.class),
  113. @ApiImplicitParam(name = "streamId", value = "流ID", required = true, dataTypeClass = String.class),
  114. })
  115. @PostMapping(value = "/stop")
  116. @ResponseBody
  117. public Object stop(String app, String streamId){
  118. if (streamPushService.stop(app, streamId)){
  119. return "success";
  120. }else {
  121. return "fail";
  122. }
  123. }
  124. @ApiOperation("中止多个推流")
  125. @ApiImplicitParams({
  126. @ApiImplicitParam(name = "app", value = "应用名", required = true, dataTypeClass = String.class),
  127. @ApiImplicitParam(name = "streamId", value = "流ID", required = true, dataTypeClass = String.class),
  128. })
  129. @DeleteMapping(value = "/batchStop")
  130. @ResponseBody
  131. public Object batchStop(@RequestBody BatchGBStreamParam batchGBStreamParam){
  132. if (batchGBStreamParam.getGbStreams().size() == 0) {
  133. return "fail";
  134. }
  135. if (streamPushService.batchStop(batchGBStreamParam.getGbStreams())){
  136. return "success";
  137. }else {
  138. return "fail";
  139. }
  140. }
  141. @PostMapping(value = "upload")
  142. @ResponseBody
  143. public DeferredResult<ResponseEntity<WVPResult<Object>>> uploadChannelFile(@RequestParam(value = "file") MultipartFile file){
  144. // 最多处理文件一个小时
  145. DeferredResult<ResponseEntity<WVPResult<Object>>> result = new DeferredResult<>(60*60*1000L);
  146. // 录像查询以channelId作为deviceId查询
  147. String key = DeferredResultHolder.UPLOAD_FILE_CHANNEL;
  148. String uuid = UUID.randomUUID().toString();
  149. logger.info("通道导入文件类型: {}",file.getContentType() );
  150. if (file.isEmpty()) {
  151. logger.warn("通道导入文件为空");
  152. WVPResult<Object> wvpResult = new WVPResult<>();
  153. wvpResult.setCode(-1);
  154. wvpResult.setMsg("文件为空");
  155. result.setResult(ResponseEntity.status(HttpStatus.BAD_REQUEST).body(wvpResult));
  156. return result;
  157. }
  158. if (file.getContentType() == null) {
  159. WVPResult<Object> wvpResult = new WVPResult<>();
  160. wvpResult.setCode(-1);
  161. wvpResult.setMsg("无法识别文件类型");
  162. result.setResult(ResponseEntity.status(HttpStatus.BAD_REQUEST).body(wvpResult));
  163. return result;
  164. }
  165. // 同时只处理一个文件
  166. if (resultHolder.exist(key, null)) {
  167. logger.warn("已有导入任务正在执行");
  168. WVPResult<Object> wvpResult = new WVPResult<>();
  169. wvpResult.setCode(-1);
  170. wvpResult.setMsg("已有导入任务正在执行");
  171. result.setResult(ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(wvpResult));
  172. return result;
  173. }
  174. resultHolder.put(key, uuid, result);
  175. result.onTimeout(()->{
  176. logger.warn("通道导入超时,可能文件过大");
  177. RequestMessage msg = new RequestMessage();
  178. msg.setKey(key);
  179. WVPResult<Object> wvpResult = new WVPResult<>();
  180. wvpResult.setCode(-1);
  181. wvpResult.setMsg("导入超时,可能文件过大");
  182. msg.setData(wvpResult);
  183. resultHolder.invokeAllResult(msg);
  184. });
  185. //获取文件流
  186. InputStream inputStream = null;
  187. try {
  188. String name = file.getName();
  189. inputStream = file.getInputStream();
  190. } catch (IOException e) {
  191. e.printStackTrace();
  192. }
  193. try {
  194. //传入参数
  195. ExcelReader excelReader = EasyExcel.read(inputStream, StreamPushExcelDto.class,
  196. new StreamPushUploadFileHandler(streamPushService, mediaServerService.getDefaultMediaServer().getId(), (errorStreams, errorGBs)->{
  197. logger.info("通道导入成功,存在重复App+Stream为{}个,存在国标ID为{}个", errorStreams.size(), errorGBs.size());
  198. RequestMessage msg = new RequestMessage();
  199. msg.setKey(key);
  200. WVPResult<Map<String, List<String>>> wvpResult = new WVPResult<>();
  201. if (errorStreams.size() == 0 && errorGBs.size() == 0) {
  202. wvpResult.setCode(0);
  203. wvpResult.setMsg("成功");
  204. }else {
  205. wvpResult.setCode(1);
  206. wvpResult.setMsg("导入成功。但是存在重复数据");
  207. Map<String, List<String>> errorData = new HashMap<>();
  208. errorData.put("gbId", errorGBs);
  209. errorData.put("stream", errorStreams);
  210. wvpResult.setData(errorData);
  211. }
  212. msg.setData(wvpResult);
  213. resultHolder.invokeAllResult(msg);
  214. })).build();
  215. ReadSheet readSheet = EasyExcel.readSheet(0).build();
  216. excelReader.read(readSheet);
  217. excelReader.finish();
  218. }catch (Exception e) {
  219. logger.warn("通道导入失败:", e);
  220. RequestMessage msg = new RequestMessage();
  221. msg.setKey(key);
  222. WVPResult<Object> wvpResult = new WVPResult<>();
  223. wvpResult.setCode(-1);
  224. wvpResult.setMsg("通道导入失败: " + e.getMessage() );
  225. msg.setData(wvpResult);
  226. resultHolder.invokeAllResult(msg);
  227. }
  228. return result;
  229. }
  230. /**
  231. * 获取推流播放地址
  232. * @param app 应用名
  233. * @param stream 流id
  234. * @return
  235. */
  236. @ApiOperation("获取推流播放地址")
  237. @ApiImplicitParams({
  238. @ApiImplicitParam(name = "app", value = "应用名", dataTypeClass = String.class),
  239. @ApiImplicitParam(name = "stream", value = "流id", dataTypeClass = String.class),
  240. @ApiImplicitParam(name = "mediaServerId", value = "媒体服务器id", dataTypeClass = String.class, required = false),
  241. })
  242. @GetMapping(value = "/getPlayUrl")
  243. @ResponseBody
  244. public WVPResult<StreamInfo> getPlayUrl(HttpServletRequest request, @RequestParam String app,
  245. @RequestParam String stream,
  246. @RequestParam(required = false) String mediaServerId){
  247. boolean authority = false;
  248. // 是否登陆用户, 登陆用户返回完整信息
  249. LoginUser userInfo = SecurityUtils.getUserInfo();
  250. if (userInfo!= null) {
  251. authority = true;
  252. }
  253. WVPResult<StreamInfo> result = new WVPResult<>();
  254. StreamPushItem push = streamPushService.getPush(app, stream);
  255. if (!userSetting.getServerId().equals(push.getServerId()) ) {
  256. result.setCode(-1);
  257. result.setMsg("来自其他平台的推流信息");
  258. return result;
  259. }
  260. StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority);
  261. if (streamInfo != null){
  262. result.setCode(0);
  263. result.setMsg("scccess");
  264. result.setData(streamInfo);
  265. }else {
  266. result.setCode(-1);
  267. result.setMsg("获取播放地址失败");
  268. }
  269. return result;
  270. }
  271. /**
  272. * 获取推流播放地址
  273. * @param stream 推流信息
  274. * @return
  275. */
  276. @ApiOperation("获取推流播放地址")
  277. @ApiImplicitParams({
  278. @ApiImplicitParam(name = "stream", value = "推流信息", dataTypeClass = StreamPushItem.class),
  279. })
  280. @PostMapping(value = "/add")
  281. @ResponseBody
  282. public WVPResult<StreamInfo> add(@RequestBody StreamPushItem stream){
  283. if (StringUtils.isEmpty(stream.getGbId())) {
  284. return new WVPResult<>(400, "国标ID不可为空", null);
  285. }
  286. if (StringUtils.isEmpty(stream.getApp()) && StringUtils.isEmpty(stream.getStream())) {
  287. return new WVPResult<>(400, "app或stream不可为空", null);
  288. }
  289. stream.setStatus(false);
  290. stream.setPushIng(false);
  291. stream.setAliveSecond(0L);
  292. stream.setTotalReaderCount("0");
  293. boolean result = streamPushService.add(stream);
  294. if (result) {
  295. return new WVPResult<>(0, "success", null);
  296. }else {
  297. return new WVPResult<>(-1, "fail", null);
  298. }
  299. }
  300. }