|  | @@ -7,10 +7,7 @@ import com.genersoft.iot.vmp.service.IStreamPushService;
 | 
	
		
			
				|  |  |  import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
 | 
	
		
			
				|  |  |  import org.springframework.util.StringUtils;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -import java.util.ArrayList;
 | 
	
		
			
				|  |  | -import java.util.HashSet;
 | 
	
		
			
				|  |  | -import java.util.List;
 | 
	
		
			
				|  |  | -import java.util.Set;
 | 
	
		
			
				|  |  | +import java.util.*;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPushExcelDto> {
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -18,10 +15,13 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
 | 
	
		
			
				|  |  |      private IStreamPushService pushService;
 | 
	
		
			
				|  |  |      private String defaultMediaServerId;
 | 
	
		
			
				|  |  |      private List<StreamPushItem> streamPushItems = new ArrayList<>();
 | 
	
		
			
				|  |  | +    private Map<String, UploadData> streamPushItemsForPlatform = new HashMap<>();
 | 
	
		
			
				|  |  |      private Set<String> streamPushStreamSet = new HashSet<>();
 | 
	
		
			
				|  |  |      private Set<String> streamPushGBSet = new HashSet<>();
 | 
	
		
			
				|  |  |      private List<String> errorStreamList = new ArrayList<>();
 | 
	
		
			
				|  |  |      private List<String> errorGBList = new ArrayList<>();
 | 
	
		
			
				|  |  | +    // 读取数量计数器
 | 
	
		
			
				|  |  | +    private int loadedSize = 0;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public StreamPushUploadFileHandler(IStreamPushService pushService, String defaultMediaServerId, ErrorDataHandler errorDataHandler) {
 | 
	
		
			
				|  |  |          this.pushService = pushService;
 | 
	
	
		
			
				|  | @@ -33,6 +33,16 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
 | 
	
		
			
				|  |  |          void handle(List<String> streams, List<String> gbId);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    private class UploadData{
 | 
	
		
			
				|  |  | +        public String platformId;
 | 
	
		
			
				|  |  | +        public Map<String, List<StreamPushItem>> catalogData = new HashMap<>();
 | 
	
		
			
				|  |  | +        public List<StreamPushItem> streamPushItems = new ArrayList<>();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        public UploadData(String platformId) {
 | 
	
		
			
				|  |  | +            this.platformId = platformId;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      @Override
 | 
	
		
			
				|  |  |      public void invoke(StreamPushExcelDto streamPushExcelDto, AnalysisContext analysisContext) {
 | 
	
		
			
				|  |  |          if (StringUtils.isEmpty(streamPushExcelDto.getApp())
 | 
	
	
		
			
				|  | @@ -43,10 +53,10 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
 | 
	
		
			
				|  |  |          if (streamPushGBSet.contains(streamPushExcelDto.getGbId())) {
 | 
	
		
			
				|  |  |              errorGBList.add(streamPushExcelDto.getGbId());
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | -        if (streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream())) {
 | 
	
		
			
				|  |  | +        if (streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId())) {
 | 
	
		
			
				|  |  |              errorStreamList.add(streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream());
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | -        if (streamPushGBSet.contains(streamPushExcelDto.getGbId()) || streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream())) {
 | 
	
		
			
				|  |  | +        if (streamPushGBSet.contains(streamPushExcelDto.getGbId()) || streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId())) {
 | 
	
		
			
				|  |  |              return;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -62,24 +72,69 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
 | 
	
		
			
				|  |  |          streamPushItem.setOriginType(2);
 | 
	
		
			
				|  |  |          streamPushItem.setOriginTypeStr("rtsp_push");
 | 
	
		
			
				|  |  |          streamPushItem.setTotalReaderCount("0");
 | 
	
		
			
				|  |  | -        streamPushItems.add(streamPushItem);
 | 
	
		
			
				|  |  | +        streamPushItem.setPlatformId(streamPushExcelDto.getPlatformId());
 | 
	
		
			
				|  |  | +        streamPushItem.setCatalogId(streamPushExcelDto.getCatalogId());
 | 
	
		
			
				|  |  | +        if (StringUtils.isEmpty(streamPushExcelDto.getPlatformId())) {
 | 
	
		
			
				|  |  | +            streamPushItems.add(streamPushItem);
 | 
	
		
			
				|  |  | +        }else {
 | 
	
		
			
				|  |  | +            UploadData uploadData = streamPushItemsForPlatform.get(streamPushExcelDto.getPlatformId());
 | 
	
		
			
				|  |  | +            if (uploadData == null) {
 | 
	
		
			
				|  |  | +                uploadData = new UploadData(streamPushExcelDto.getPlatformId());
 | 
	
		
			
				|  |  | +                streamPushItemsForPlatform.put(streamPushExcelDto.getPlatformId(), uploadData);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            if (!StringUtils.isEmpty(streamPushExcelDto.getCatalogId())) {
 | 
	
		
			
				|  |  | +                List<StreamPushItem> streamPushItems = uploadData.catalogData.get(streamPushExcelDto.getCatalogId());
 | 
	
		
			
				|  |  | +                if (streamPushItems == null) {
 | 
	
		
			
				|  |  | +                    streamPushItems = new ArrayList<>();
 | 
	
		
			
				|  |  | +                    uploadData.catalogData.put(streamPushExcelDto.getCatalogId(), streamPushItems);
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +                streamPushItems.add(streamPushItem);
 | 
	
		
			
				|  |  | +            }else {
 | 
	
		
			
				|  |  | +                uploadData.streamPushItems.add(streamPushItem);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          streamPushGBSet.add(streamPushExcelDto.getGbId());
 | 
	
		
			
				|  |  |          streamPushStreamSet.add(streamPushExcelDto.getApp()+streamPushExcelDto.getStream());
 | 
	
		
			
				|  |  | -        if (streamPushItems.size() > 300) {
 | 
	
		
			
				|  |  | -            pushService.batchAdd(streamPushItems);
 | 
	
		
			
				|  |  | -            // 存储完成清理 list
 | 
	
		
			
				|  |  | +        loadedSize ++;
 | 
	
		
			
				|  |  | +        if (loadedSize > 1000) {
 | 
	
		
			
				|  |  | +            saveData();
 | 
	
		
			
				|  |  |              streamPushItems.clear();
 | 
	
		
			
				|  |  | +            streamPushItemsForPlatform.clear();
 | 
	
		
			
				|  |  | +            loadedSize = 0;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @Override
 | 
	
		
			
				|  |  |      public void doAfterAllAnalysed(AnalysisContext analysisContext) {
 | 
	
		
			
				|  |  |          // 这里也要保存数据,确保最后遗留的数据也存储到数据库
 | 
	
		
			
				|  |  | -        if (streamPushItems.size() > 0) {
 | 
	
		
			
				|  |  | -            pushService.batchAdd(streamPushItems);
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | +        saveData();
 | 
	
		
			
				|  |  |          streamPushGBSet.clear();
 | 
	
		
			
				|  |  |          streamPushStreamSet.clear();
 | 
	
		
			
				|  |  |          errorDataHandler.handle(errorStreamList, errorGBList);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private void saveData(){
 | 
	
		
			
				|  |  | +        if (streamPushItems.size() > 0) {
 | 
	
		
			
				|  |  | +            pushService.batchAddForUpload(null, null, streamPushItems);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        // 处理已分配到平台的流
 | 
	
		
			
				|  |  | +        if (streamPushItemsForPlatform.size() > 0){
 | 
	
		
			
				|  |  | +            for (String platformId : streamPushItemsForPlatform.keySet()) {
 | 
	
		
			
				|  |  | +                UploadData uploadData = streamPushItemsForPlatform.get(platformId);
 | 
	
		
			
				|  |  | +                if (uploadData.streamPushItems.size() > 0) {
 | 
	
		
			
				|  |  | +                    pushService.batchAddForUpload(platformId, null, uploadData.streamPushItems);
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +                if (uploadData.catalogData.size() > 0) {
 | 
	
		
			
				|  |  | +                    for (String catalogId : uploadData.catalogData.keySet()) {
 | 
	
		
			
				|  |  | +                        if (uploadData.catalogData.get(catalogId).size() > 0) {
 | 
	
		
			
				|  |  | +                            pushService.batchAddForUpload(platformId, catalogId, uploadData.catalogData.get(catalogId));
 | 
	
		
			
				|  |  | +                        }
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |  }
 |