Jelajahi Sumber

Merge branch 'wvp-28181-2.0' into wvp-28181-record

# Conflicts:
#	sql/2.6.9更新.sql
648540858 2 tahun lalu
induk
melakukan
829ff200b8

+ 8 - 0
sql/2.6.9更新.sql

@@ -0,0 +1,8 @@
+alter table wvp_device_channel
+    change stream_id stream_id varying(255)
+
+alter table wvp_platform
+    add auto_push_channel bool default false
+
+alter table wvp_stream_proxy
+    add stream_key character varying(255)

+ 4 - 4
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java

@@ -89,17 +89,17 @@ public class CatalogSubscribeTask implements ISubscribeTask {
                 ResponseEvent event = (ResponseEvent) eventResult.event;
                 if (event.getResponse().getRawContent() != null) {
                     // 成功
-                    logger.info("[取消目录订阅订阅]成功: {}", device.getDeviceId());
+                    logger.info("[取消目录订阅]成功: {}", device.getDeviceId());
                 }else {
                     // 成功
-                    logger.info("[取消目录订阅订阅]成功: {}", device.getDeviceId());
+                    logger.info("[取消目录订阅]成功: {}", device.getDeviceId());
                 }
             },eventResult -> {
                 // 失败
-                logger.warn("[取消目录订阅订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
+                logger.warn("[取消目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
             });
         } catch (InvalidArgumentException | SipException | ParseException e) {
-            logger.error("[命令发送失败] 取消目录订阅订阅: {}", e.getMessage());
+            logger.error("[命令发送失败] 取消目录订阅: {}", e.getMessage());
         }
     }
 }

+ 0 - 109
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java

@@ -132,7 +132,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 
 						if (CmdType.CATALOG.equals(cmd)) {
 							logger.info("接收到Catalog通知");
-							processNotifyCatalogList(take.getEvt());
 							notifyRequestForCatalogProcessor.process(take.getEvt());
 						} else if (CmdType.ALARM.equals(cmd)) {
 							logger.info("接收到Alarm通知");
@@ -371,114 +370,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 		}
 	}
 
-	/***
-	 * 处理catalog设备目录列表Notify
-	 *
-	 * @param evt
-	 */
-	private void processNotifyCatalogList(RequestEvent evt) {
-		try {
-			FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
-			String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
-
-			Device device = redisCatchStorage.getDevice(deviceId);
-			if (device == null || !device.isOnLine()) {
-				logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" ));
-				return;
-			}
-			Element rootElement = getRootElement(evt, device.getCharset());
-			if (rootElement == null) {
-				logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest());
-				return;
-			}
-			Element deviceListElement = rootElement.element("DeviceList");
-			if (deviceListElement == null) {
-				return;
-			}
-			Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
-			if (deviceListIterator != null) {
-
-				// 遍历DeviceList
-				while (deviceListIterator.hasNext()) {
-					Element itemDevice = deviceListIterator.next();
-					Element channelDeviceElement = itemDevice.element("DeviceID");
-					if (channelDeviceElement == null) {
-						continue;
-					}
-					Element eventElement = itemDevice.element("Event");
-					String event;
-					if (eventElement == null) {
-						logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" ));
-						event = CatalogEvent.ADD;
-					}else {
-						event = eventElement.getText().toUpperCase();
-					}
-					DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event, civilCodeFileConf);
-					if (channel == null) {
-						logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
-						continue;
-					}
-					if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
-						channel.setParentId(null);
-					}
-					channel.setDeviceId(device.getDeviceId());
-					logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
-					switch (event) {
-						case CatalogEvent.ON:
-							// 上线
-							logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							storager.deviceChannelOnline(deviceId, channel.getChannelId());
-							break;
-						case CatalogEvent.OFF :
-							// 离线
-							logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
-								storager.deviceChannelOffline(deviceId, channel.getChannelId());
-							}else {
-								logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							}
-							break;
-						case CatalogEvent.VLOST:
-							// 视频丢失
-							logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
-								storager.deviceChannelOffline(deviceId, channel.getChannelId());
-							}else {
-								logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							}
-							break;
-						case CatalogEvent.DEFECT:
-							// 故障
-							break;
-						case CatalogEvent.ADD:
-							// 增加
-							logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							deviceChannelService.updateChannel(deviceId, channel);
-							break;
-						case CatalogEvent.DEL:
-							// 删除
-							logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							storager.delChannel(deviceId, channel.getChannelId());
-							break;
-						case CatalogEvent.UPDATE:
-							// 更新
-							logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
-							deviceChannelService.updateChannel(deviceId, channel);
-							break;
-						default:
-							logger.warn("[ NotifyCatalog ] event not found : {}", event );
-
-					}
-					// 转发变化信息
-					eventPublisher.catalogEventPublish(null, channel, event);
-
-				}
-			}
-		} catch (DocumentException e) {
-			logger.error("未处理的异常 ", e);
-		}
-	}
-
 	public void setCmder(SIPCommander cmder) {
 	}
 

+ 15 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java

@@ -215,6 +215,21 @@ public class ZLMRESTfulUtils {
         }
     }
 
+    public JSONObject isMediaOnline(MediaServerItem mediaServerItem, String app, String stream, String schema){
+        Map<String, Object> param = new HashMap<>();
+        if (app != null) {
+            param.put("app",app);
+        }
+        if (stream != null) {
+            param.put("stream",stream);
+        }
+        if (schema != null) {
+            param.put("schema",schema);
+        }
+        param.put("vhost","__defaultVhost__");
+        return sendPost(mediaServerItem, "isMediaOnline", param, null);
+    }
+
     public JSONObject getMediaList(MediaServerItem mediaServerItem, String app, String stream, String schema, RequestCallback callback){
         Map<String, Object> param = new HashMap<>();
         if (app != null) {

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java

@@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

+ 11 - 7
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java

@@ -520,16 +520,18 @@ public class DeviceServiceImpl implements IDeviceService {
 
 
         //  目录订阅相关的信息
-        if (device.getSubscribeCycleForCatalog() > 0) {
-            if (deviceInStore.getSubscribeCycleForCatalog() == 0 || deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
-                deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
+        if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
+            if (device.getSubscribeCycleForCatalog() > 0) {
+                // 若已开启订阅,但订阅周期不同,则先取消
+                if (deviceInStore.getSubscribeCycleForCatalog() != 0) {
+                    removeCatalogSubscribe(deviceInStore);
+                }
                 // 开启订阅
-                addCatalogSubscribe(deviceInStore);
-            }
-        }else if (device.getSubscribeCycleForCatalog() == 0) {
-            if (deviceInStore.getSubscribeCycleForCatalog() != 0) {
                 deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
+                addCatalogSubscribe(deviceInStore);
+            }else if (device.getSubscribeCycleForCatalog() == 0) {
                 // 取消订阅
+                deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
                 removeCatalogSubscribe(deviceInStore);
             }
         }
@@ -544,6 +546,8 @@ public class DeviceServiceImpl implements IDeviceService {
             }
         }else if (device.getSubscribeCycleForMobilePosition() == 0) {
             if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
+                deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
+                deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
                 // 取消订阅
                 removeMobilePositionSubscribe(deviceInStore);
             }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java

@@ -257,7 +257,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
                 ":" + inviteInfo.getDeviceId() +
                 ":" + inviteInfo.getChannelId() +
                 ":" + inviteInfo.getStream() +
-                ":" + inviteInfo.getSsrcInfo().getSsrc();
+                ":" + ssrc;
         if (inviteInfoInDb.getSsrcInfo() != null) {
             inviteInfoInDb.getSsrcInfo().setSsrc(ssrc);
         }

+ 43 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java

@@ -35,15 +35,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.TransactionDefinition;
 import org.springframework.transaction.TransactionStatus;
+import org.springframework.util.CollectionUtils;
 import org.springframework.util.ObjectUtils;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * 视频代理业务
@@ -560,4 +564,43 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
 
         return new ResourceBaseInfo(total, online);
     }
+
+
+    @Scheduled(cron = "* 0/10 * * * ?")
+    public void asyncCheckStreamProxyStatus() {
+
+        List<MediaServerItem> all = mediaServerService.getAllOnline();
+
+        if (CollectionUtils.isEmpty(all)){
+            return;
+        }
+
+        Map<String, MediaServerItem> serverItemMap = all.stream().collect(Collectors.toMap(MediaServerItem::getId, Function.identity(), (m1, m2) -> m1));
+
+        List<StreamProxyItem> list = videoManagerStorager.getStreamProxyListForEnable(true);
+
+        if (CollectionUtils.isEmpty(list)){
+            return;
+        }
+
+        for (StreamProxyItem streamProxyItem : list) {
+
+            MediaServerItem mediaServerItem = serverItemMap.get(streamProxyItem.getMediaServerId());
+
+            // TODO 支持其他 schema
+            JSONObject mediaInfo = zlmresTfulUtils.isMediaOnline(mediaServerItem, streamProxyItem.getApp(), streamProxyItem.getStream(), "rtsp");
+
+            if (mediaInfo == null){
+                streamProxyItem.setStatus(false);
+            } else {
+                if (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")) {
+                    streamProxyItem.setStatus(true);
+                } else {
+                    streamProxyItem.setStatus(false);
+                }
+            }
+
+            updateStreamProxy(streamProxyItem);
+        }
+    }
 }

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java

@@ -506,6 +506,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
         stream.setUpdateTime(DateUtil.getNow());
         stream.setCreateTime(DateUtil.getNow());
         stream.setServerId(userSetting.getServerId());
+        stream.setMediaServerId(mediaConfig.getId());
+        stream.setSelf(true);
+        stream.setPushIng(true);
 
         // 放在事务内执行
         boolean result = false;