StreamPushUploadFileHandler.java 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.excel.context.AnalysisContext;
  3. import com.alibaba.excel.event.AnalysisEventListener;
  4. import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
  5. import com.genersoft.iot.vmp.service.IStreamPushService;
  6. import com.genersoft.iot.vmp.utils.DateUtil;
  7. import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
  8. import com.google.common.collect.BiMap;
  9. import com.google.common.collect.HashBiMap;
  10. import org.springframework.util.StringUtils;
  11. import java.util.*;
  12. public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPushExcelDto> {
  13. /**
  14. * 错误数据的回调,用于将错误数据发送给页面
  15. */
  16. private ErrorDataHandler errorDataHandler;
  17. /**
  18. * 推流的业务类用于存储数据
  19. */
  20. private IStreamPushService pushService;
  21. /**
  22. * 默认流媒体节点ID
  23. */
  24. private String defaultMediaServerId;
  25. /**
  26. * 用于存储不加过滤的所有数据
  27. */
  28. private List<StreamPushItem> streamPushItems = new ArrayList<>();
  29. /**
  30. * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表
  31. */
  32. private Map<String,StreamPushItem> streamPushItemForSave = new HashMap<>();
  33. /**
  34. * 用于存储按照APP+Stream为KEY, 平台ID+目录Id 为value的数据,用于存储到gb_stream表后获取app+Stream对应的平台与目录信息,然后存入关联表
  35. */
  36. private Map<String, List<String[]>> streamPushItemsForPlatform = new HashMap<>();
  37. /**
  38. * 用于判断文件是否存在重复的app+Stream+平台ID
  39. */
  40. private Set<String> streamPushStreamSet = new HashSet<>();
  41. /**
  42. * 用于存储APP+Stream->国标ID 的数据结构, 数据一一对应,全局判断APP+Stream->国标ID是否存在不对应
  43. */
  44. private BiMap<String,String> gBMap = HashBiMap.create();
  45. /**
  46. * 记录错误的APP+Stream
  47. */
  48. private List<String> errorStreamList = new ArrayList<>();
  49. /**
  50. * 记录错误的国标ID
  51. */
  52. private List<String> errorGBList = new ArrayList<>();
  53. /**
  54. * 读取数量计数器
  55. */
  56. private int loadedSize = 0;
  57. public StreamPushUploadFileHandler(IStreamPushService pushService, String defaultMediaServerId, ErrorDataHandler errorDataHandler) {
  58. this.pushService = pushService;
  59. this.defaultMediaServerId = defaultMediaServerId;
  60. this.errorDataHandler = errorDataHandler;
  61. }
  62. public interface ErrorDataHandler{
  63. void handle(List<String> streams, List<String> gbId);
  64. }
  65. @Override
  66. public void invoke(StreamPushExcelDto streamPushExcelDto, AnalysisContext analysisContext) {
  67. if (StringUtils.isEmpty(streamPushExcelDto.getApp())
  68. || StringUtils.isEmpty(streamPushExcelDto.getStream())
  69. || StringUtils.isEmpty(streamPushExcelDto.getGbId())) {
  70. return;
  71. }
  72. if (gBMap.get(streamPushExcelDto.getApp() + streamPushExcelDto.getStream()) == null) {
  73. try {
  74. gBMap.put(streamPushExcelDto.getApp() + streamPushExcelDto.getStream(), streamPushExcelDto.getGbId());
  75. }catch (IllegalArgumentException e) {
  76. e.printStackTrace();
  77. errorGBList.add(streamPushExcelDto.getGbId() + "(不同的app+stream使用了相同的国标ID)");
  78. return;
  79. }
  80. }else {
  81. if (!gBMap.get(streamPushExcelDto.getApp() + streamPushExcelDto.getStream()).equals(streamPushExcelDto.getGbId())) {
  82. errorGBList.add(streamPushExcelDto.getGbId() + "(同一组app+stream使用了不同的国标ID)");
  83. return;
  84. }
  85. }
  86. if (streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId())) {
  87. errorStreamList.add(streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream()+ "/" +
  88. streamPushExcelDto.getPlatformId() + "(同一组app+stream添加在了同一个平台下)");
  89. return;
  90. }else {
  91. streamPushStreamSet.add(streamPushExcelDto.getApp()+streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId());
  92. }
  93. StreamPushItem streamPushItem = new StreamPushItem();
  94. streamPushItem.setApp(streamPushExcelDto.getApp());
  95. streamPushItem.setStream(streamPushExcelDto.getStream());
  96. streamPushItem.setGbId(streamPushExcelDto.getGbId());
  97. streamPushItem.setStatus(false);
  98. streamPushItem.setStreamType("push");
  99. streamPushItem.setCreateTime(DateUtil.getNow());
  100. streamPushItem.setMediaServerId(defaultMediaServerId);
  101. streamPushItem.setName(streamPushExcelDto.getName());
  102. streamPushItem.setOriginType(2);
  103. streamPushItem.setOriginTypeStr("rtsp_push");
  104. streamPushItem.setTotalReaderCount("0");
  105. streamPushItem.setPlatformId(streamPushExcelDto.getPlatformId());
  106. streamPushItem.setCatalogId(streamPushExcelDto.getCatalogId());
  107. // 存入所有的通道信息
  108. streamPushItems.add(streamPushItem);
  109. streamPushItemForSave.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
  110. if (!StringUtils.isEmpty(streamPushExcelDto.getPlatformId())) {
  111. List<String[]> platformList = streamPushItemsForPlatform.get(streamPushItem.getApp() + streamPushItem.getStream());
  112. if (platformList == null) {
  113. platformList = new ArrayList<>();
  114. streamPushItemsForPlatform.put(streamPushItem.getApp() + streamPushItem.getStream(), platformList);
  115. }
  116. String platformId = streamPushExcelDto.getPlatformId();
  117. String catalogId = streamPushExcelDto.getCatalogId();
  118. if (StringUtils.isEmpty(streamPushExcelDto.getCatalogId())) {
  119. catalogId = null;
  120. }
  121. String[] platFormInfoArray = new String[]{platformId, catalogId};
  122. platformList.add(platFormInfoArray);
  123. }
  124. loadedSize ++;
  125. if (loadedSize > 1000) {
  126. saveData();
  127. streamPushItems.clear();
  128. streamPushItemForSave.clear();
  129. streamPushItemsForPlatform.clear();
  130. loadedSize = 0;
  131. }
  132. }
  133. @Override
  134. public void doAfterAllAnalysed(AnalysisContext analysisContext) {
  135. // 这里也要保存数据,确保最后遗留的数据也存储到数据库
  136. saveData();
  137. streamPushItems.clear();
  138. streamPushItemForSave.clear();
  139. gBMap.clear();
  140. streamPushStreamSet.clear();
  141. streamPushItemsForPlatform.clear();
  142. errorDataHandler.handle(errorStreamList, errorGBList);
  143. }
  144. private void saveData(){
  145. if (streamPushItemForSave.size() > 0) {
  146. // 向数据库查询是否存在重复的app
  147. pushService.batchAddForUpload(new ArrayList<>(streamPushItemForSave.values()), streamPushItemsForPlatform);
  148. }
  149. }
  150. }