|
|
@@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.utils.DateUtil;
|
|
|
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
|
|
import com.github.pagehelper.PageHelper;
|
|
|
import com.github.pagehelper.PageInfo;
|
|
|
+import com.google.common.base.Joiner;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.context.event.EventListener;
|
|
|
@@ -27,6 +28,7 @@ import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.util.*;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
@Service
|
|
|
@Slf4j
|
|
|
@@ -53,63 +55,94 @@ public class RecordPlanServiceImpl implements IRecordPlanService {
|
|
|
@EventListener
|
|
|
public void onApplicationEvent(MediaDepartureEvent event) {
|
|
|
// 流断开,检查是否还处于录像状态, 如果是则继续录像
|
|
|
- if (recording(event.getApp(), event.getStream())) {
|
|
|
- // 重新拉起
|
|
|
-
|
|
|
+ Integer channelId = recording(event.getApp(), event.getStream());
|
|
|
+ if(channelId == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 重新拉起
|
|
|
+ CommonGBChannel channel = channelMapper.queryById(channelId);
|
|
|
+ if (channel == null) {
|
|
|
+ log.warn("[录制计划] 流离开时拉起需要录像的流时, 发现通道不存在, id: {}", channelId);
|
|
|
+ return;
|
|
|
}
|
|
|
+ // 开启点播,
|
|
|
+ channelPlayService.play(channel, null, ((code, msg, streamInfo) -> {
|
|
|
+ if (code == InviteErrorCode.SUCCESS.getCode() && streamInfo != null) {
|
|
|
+ log.info("[录像] 流离开时拉起需要录像的流, 开启成功, 通道ID: {}", channel.getGbId());
|
|
|
+ recordStreamMap.put(channel.getGbId(), streamInfo);
|
|
|
+ } else {
|
|
|
+ recordStreamMap.remove(channelId);
|
|
|
+ log.info("[录像] 流离开时拉起需要录像的流, 开启失败, 十分钟后重试, 通道ID: {}", channel.getGbId());
|
|
|
+ }
|
|
|
+ }));
|
|
|
}
|
|
|
|
|
|
Map<Integer, StreamInfo> recordStreamMap = new HashMap<>();
|
|
|
|
|
|
- @Scheduled(cron = "0 */30 * * * *")
|
|
|
+// @Scheduled(cron = "0 */30 * * * *")
|
|
|
+ @Scheduled(fixedRate = 10, timeUnit = TimeUnit.MINUTES)
|
|
|
public void execution() {
|
|
|
- // 执行计划
|
|
|
+ log.info("[录制计划] 执行");
|
|
|
+ // 查询现在需要录像的通道Id
|
|
|
+ List<Integer> startChannelIdList = queryCurrentChannelRecord();
|
|
|
|
|
|
- // 获取当前时间在一周内的序号
|
|
|
- LocalDateTime now = LocalDateTime.now();
|
|
|
- int week = now.getDayOfWeek().getValue();
|
|
|
- int index = now.getHour() * 2 + (now.getMinute() > 30?1:0);
|
|
|
- // 查询startTime等于现在的, 开始录像
|
|
|
- List<Integer> startPlanList = recordPlanMapper.queryStart(week, index);
|
|
|
-
|
|
|
- Map<Integer, StreamInfo> channelMapWithoutRecord = new HashMap<>();
|
|
|
- if (startPlanList.isEmpty()) {
|
|
|
- // 停止所有正在录像的
|
|
|
- if(recordStreamMap.isEmpty()) {
|
|
|
- // 暂无录像任务
|
|
|
- return;
|
|
|
- }else {
|
|
|
- channelMapWithoutRecord.putAll(recordStreamMap);
|
|
|
+ if (startChannelIdList.isEmpty()) {
|
|
|
+ // 当前没有录像任务, 如果存在旧的正在录像的就移除
|
|
|
+ if(!recordStreamMap.isEmpty()) {
|
|
|
+ stopStreams(recordStreamMap.keySet(), recordStreamMap);
|
|
|
recordStreamMap.clear();
|
|
|
}
|
|
|
}else {
|
|
|
- channelMapWithoutRecord.putAll(recordStreamMap);
|
|
|
- // 获取所有的关联的通道
|
|
|
- List<CommonGBChannel> channelList = channelMapper.queryForRecordPlan(startPlanList);
|
|
|
- if (channelList.isEmpty()) {
|
|
|
- recordStreamMap.clear();
|
|
|
- }else {
|
|
|
- // 查找是否已经开启录像, 如果没有则开启录像
|
|
|
- for (CommonGBChannel channel : channelList) {
|
|
|
- if (recordStreamMap.get(channel.getGbId()) != null) {
|
|
|
- channelMapWithoutRecord.remove(channel.getGbId());
|
|
|
- }else {
|
|
|
+ // 当前存在录像任务, 获取正在录像中存在但是当前录制列表不存在的内容,进行停止; 获取正在录像中没有但是当前需录制的列表中存在的进行开启.
|
|
|
+ Set<Integer> recordStreamSet = new HashSet<>(recordStreamMap.keySet());
|
|
|
+ startChannelIdList.forEach(recordStreamSet::remove);
|
|
|
+ if (!recordStreamSet.isEmpty()) {
|
|
|
+ // 正在录像中存在但是当前录制列表不存在的内容,进行停止;
|
|
|
+ stopStreams(recordStreamSet, recordStreamMap);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 移除startChannelIdList中已经在录像的部分, 剩下的都是需要新添加的(正在录像中没有但是当前需录制的列表中存在的进行开启)
|
|
|
+ recordStreamMap.keySet().forEach(startChannelIdList::remove);
|
|
|
+ if (!startChannelIdList.isEmpty()) {
|
|
|
+ // 获取所有的关联的通道
|
|
|
+ List<CommonGBChannel> channelList = channelMapper.queryByIds(startChannelIdList);
|
|
|
+ if (!channelList.isEmpty()) {
|
|
|
+ // 查找是否已经开启录像, 如果没有则开启录像
|
|
|
+ for (CommonGBChannel channel : channelList) {
|
|
|
// 开启点播,
|
|
|
channelPlayService.play(channel, null, ((code, msg, streamInfo) -> {
|
|
|
if (code == InviteErrorCode.SUCCESS.getCode() && streamInfo != null) {
|
|
|
log.info("[录像] 开启成功, 通道ID: {}", channel.getGbId());
|
|
|
recordStreamMap.put(channel.getGbId(), streamInfo);
|
|
|
- channelMapWithoutRecord.remove(channel.getGbId(), streamInfo);
|
|
|
+ } else {
|
|
|
+ log.info("[录像] 开启失败, 十分钟后重试, 通道ID: {}", channel.getGbId());
|
|
|
}
|
|
|
}));
|
|
|
}
|
|
|
+ } else {
|
|
|
+ log.error("[录制计划] 数据异常, 这些关联的通道已经不存在了: {}", Joiner.on(",").join(startChannelIdList));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- // 结束录像
|
|
|
- if(!channelMapWithoutRecord.isEmpty()) {
|
|
|
- for (Integer channelId : channelMapWithoutRecord.keySet()) {
|
|
|
- StreamInfo streamInfo = channelMapWithoutRecord.get(channelId);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取当前时间段应该录像的通道Id列表
|
|
|
+ */
|
|
|
+ private List<Integer> queryCurrentChannelRecord(){
|
|
|
+ // 获取当前时间在一周内的序号, 数据库存储的从第几个30分钟开始, 0-47, 包括首尾
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+ int week = now.getDayOfWeek().getValue();
|
|
|
+ int index = now.getHour() * 2 + (now.getMinute() > 30?1:0);
|
|
|
+
|
|
|
+ // 查询现在需要录像的通道Id
|
|
|
+ return recordPlanMapper.queryRecordIng(week, index);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void stopStreams(Collection<Integer> channelIds, Map<Integer, StreamInfo> recordStreamMap) {
|
|
|
+ for (Integer channelId : channelIds) {
|
|
|
+ try {
|
|
|
+ StreamInfo streamInfo = recordStreamMap.get(channelId);
|
|
|
if (streamInfo == null) {
|
|
|
continue;
|
|
|
}
|
|
|
@@ -117,23 +150,25 @@ public class RecordPlanServiceImpl implements IRecordPlanService {
|
|
|
MediaInfo mediaInfo = mediaServerService.getMediaInfo(streamInfo.getMediaServer(), streamInfo.getApp(), streamInfo.getStream());
|
|
|
if (mediaInfo.getReaderCount() == null || mediaInfo.getReaderCount() == 0) {
|
|
|
mediaServerService.closeStreams(streamInfo.getMediaServer(), streamInfo.getApp(), streamInfo.getStream());
|
|
|
- log.info("[录像] 停止, 通道ID: {}", channelId);
|
|
|
+ log.info("[录制计划] 停止, 通道ID: {}", channelId);
|
|
|
}
|
|
|
+ }catch (Exception e) {
|
|
|
+ log.error("[录制计划] 停止时异常", e);
|
|
|
+ }finally {
|
|
|
+ recordStreamMap.remove(channelId);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // 系统启动时
|
|
|
-
|
|
|
-
|
|
|
@Override
|
|
|
- public boolean recording(String app, String stream) {
|
|
|
- for (StreamInfo streamInfo : recordStreamMap.values()) {
|
|
|
- if (streamInfo.getApp().equals(app) && streamInfo.getStream().equals(stream)) {
|
|
|
- return true;
|
|
|
+ public Integer recording(String app, String stream) {
|
|
|
+ for (Integer channelId : recordStreamMap.keySet()) {
|
|
|
+ StreamInfo streamInfo = recordStreamMap.get(channelId);
|
|
|
+ if (streamInfo != null && streamInfo.getApp().equals(app) && streamInfo.getStream().equals(stream)) {
|
|
|
+ return channelId;
|
|
|
}
|
|
|
}
|
|
|
- return false;
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@@ -226,7 +261,12 @@ public class RecordPlanServiceImpl implements IRecordPlanService {
|
|
|
}else {
|
|
|
channelMapper.addRecordPlan(channelIds, planId);
|
|
|
}
|
|
|
- // TODO 更新录像队列
|
|
|
+ // 查看当前的待录制列表是否变化,如果变化,则调用录制计划马上开始录制
|
|
|
+ List<Integer> currentChannelRecord = queryCurrentChannelRecord();
|
|
|
+ recordStreamMap.keySet().forEach(currentChannelRecord::remove);
|
|
|
+ if (!currentChannelRecord.isEmpty()) {
|
|
|
+ execution();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|