DeviceServiceImpl.java 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.genersoft.iot.vmp.conf.DynamicTask;
  3. import com.genersoft.iot.vmp.gb28181.bean.Device;
  4. import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
  5. import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  6. import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
  7. import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
  8. import com.genersoft.iot.vmp.service.IDeviceService;
  9. import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
  10. import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
  11. import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
  12. import com.genersoft.iot.vmp.service.IMediaServerService;
  13. import com.genersoft.iot.vmp.service.IMediaService;
  14. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  15. import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
  16. import com.genersoft.iot.vmp.utils.DateUtil;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19. import org.springframework.beans.factory.annotation.Autowired;
  20. import org.springframework.beans.factory.annotation.Qualifier;
  21. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  22. import org.springframework.stereotype.Service;
  23. import javax.sip.DialogState;
  24. import javax.sip.TimeoutEvent;
  25. import java.text.ParseException;
  26. import java.util.Calendar;
  27. import java.util.Date;
  28. import java.util.List;
  29. /**
  30. * 设备业务(目录订阅)
  31. */
  32. @Service
  33. public class DeviceServiceImpl implements IDeviceService {
  34. private final static Logger logger = LoggerFactory.getLogger(DeviceServiceImpl.class);
  35. private final String registerExpireTaskKeyPrefix = "device-register-expire-";
  36. @Autowired
  37. private DynamicTask dynamicTask;
  38. @Autowired
  39. private ISIPCommander sipCommander;
  40. @Autowired
  41. private CatalogResponseMessageHandler catalogResponseMessageHandler;
  42. @Autowired
  43. private IRedisCatchStorage redisCatchStorage;
  44. @Autowired
  45. private DeviceMapper deviceMapper;
  46. @Autowired
  47. private ISIPCommander commander;
  48. @Autowired
  49. private VideoStreamSessionManager streamSession;
  50. @Autowired
  51. private IMediaServerService mediaServerService;
  52. @Override
  53. public void online(Device device) {
  54. logger.info("[设备上线],deviceId:" + device.getDeviceId());
  55. Device deviceInRedis = redisCatchStorage.getDevice(device.getDeviceId());
  56. Device deviceInDb = deviceMapper.getDeviceByDeviceId(device.getDeviceId());
  57. String now = DateUtil.getNow();
  58. if (deviceInRedis != null && deviceInDb == null) {
  59. // redis 存在脏数据
  60. redisCatchStorage.clearCatchByDeviceId(device.getDeviceId());
  61. }
  62. if (device.getCreateTime() == null) {
  63. device.setCreateTime(now);
  64. }
  65. if (device.getRegisterTime() == null) {
  66. device.setRegisterTime(now);
  67. }
  68. if(device.getUpdateTime() == null) {
  69. device.setUpdateTime(now);
  70. }
  71. device.setOnline(1);
  72. // 第一次上线
  73. if (device.getCreateTime() == null) {
  74. logger.info("[设备上线,首次注册]: {},查询设备信息以及通道信息", device.getDeviceId());
  75. commander.deviceInfoQuery(device);
  76. sync(device);
  77. deviceMapper.add(device);
  78. }else {
  79. deviceMapper.update(device);
  80. }
  81. redisCatchStorage.updateDevice(device);
  82. // 上线添加订阅
  83. if (device.getSubscribeCycleForCatalog() > 0) {
  84. // 查询在线设备那些开启了订阅,为设备开启定时的目录订阅
  85. addCatalogSubscribe(device);
  86. }
  87. if (device.getSubscribeCycleForMobilePosition() > 0) {
  88. addMobilePositionSubscribe(device);
  89. }
  90. // 刷新过期任务
  91. String registerExpireTaskKey = registerExpireTaskKeyPrefix + device.getDeviceId();
  92. dynamicTask.stop(registerExpireTaskKey);
  93. dynamicTask.startDelay(registerExpireTaskKey, ()->{
  94. offline(device.getDeviceId());
  95. }, device.getExpires() * 1000);
  96. }
  97. @Override
  98. public void offline(String deviceId) {
  99. Device device = deviceMapper.getDeviceByDeviceId(deviceId);
  100. if (device == null) {
  101. return;
  102. }
  103. String registerExpireTaskKey = registerExpireTaskKeyPrefix + deviceId;
  104. dynamicTask.stop(registerExpireTaskKey);
  105. device.setOnline(0);
  106. redisCatchStorage.updateDevice(device);
  107. deviceMapper.update(device);
  108. // 离线释放所有ssrc
  109. List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(deviceId, null, null, null);
  110. if (ssrcTransactions != null && ssrcTransactions.size() > 0) {
  111. for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
  112. mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
  113. mediaServerService.closeRTPServer(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
  114. streamSession.remove(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
  115. }
  116. }
  117. // 移除订阅
  118. removeCatalogSubscribe(device);
  119. removeMobilePositionSubscribe(device);
  120. }
  121. @Override
  122. public boolean addCatalogSubscribe(Device device) {
  123. if (device == null || device.getSubscribeCycleForCatalog() < 0) {
  124. return false;
  125. }
  126. logger.info("[添加目录订阅] 设备{}", device.getDeviceId());
  127. // 添加目录订阅
  128. CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander, dynamicTask);
  129. // 提前开始刷新订阅
  130. int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForCatalog(),30);
  131. // 设置最小值为30
  132. dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, (subscribeCycleForCatalog -1) * 1000);
  133. return true;
  134. }
  135. @Override
  136. public boolean removeCatalogSubscribe(Device device) {
  137. if (device == null || device.getSubscribeCycleForCatalog() < 0) {
  138. return false;
  139. }
  140. logger.info("移除目录订阅: {}", device.getDeviceId());
  141. dynamicTask.stop(device.getDeviceId() + "catalog");
  142. return true;
  143. }
  144. @Override
  145. public boolean addMobilePositionSubscribe(Device device) {
  146. if (device == null || device.getSubscribeCycleForMobilePosition() < 0) {
  147. return false;
  148. }
  149. logger.info("[添加移动位置订阅] 设备{}", device.getDeviceId());
  150. // 添加目录订阅
  151. MobilePositionSubscribeTask mobilePositionSubscribeTask = new MobilePositionSubscribeTask(device, sipCommander, dynamicTask);
  152. // 设置最小值为30
  153. int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForMobilePosition(),30);
  154. // 提前开始刷新订阅
  155. dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, (subscribeCycleForCatalog -1 ) * 1000);
  156. return true;
  157. }
  158. @Override
  159. public boolean removeMobilePositionSubscribe(Device device) {
  160. if (device == null || device.getSubscribeCycleForCatalog() < 0) {
  161. return false;
  162. }
  163. logger.info("移除移动位置订阅: {}", device.getDeviceId());
  164. dynamicTask.stop(device.getDeviceId() + "mobile_position");
  165. return true;
  166. }
  167. @Override
  168. public SyncStatus getChannelSyncStatus(String deviceId) {
  169. return catalogResponseMessageHandler.getChannelSyncProgress(deviceId);
  170. }
  171. @Override
  172. public Boolean isSyncRunning(String deviceId) {
  173. return catalogResponseMessageHandler.isSyncRunning(deviceId);
  174. }
  175. @Override
  176. public void sync(Device device) {
  177. if (catalogResponseMessageHandler.isSyncRunning(device.getDeviceId())) {
  178. logger.info("开启同步时发现同步已经存在");
  179. return;
  180. }
  181. int sn = (int)((Math.random()*9+1)*100000);
  182. catalogResponseMessageHandler.setChannelSyncReady(device, sn);
  183. sipCommander.catalogQuery(device, sn, event -> {
  184. String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);
  185. catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg);
  186. });
  187. }
  188. @Override
  189. public Device queryDevice(String deviceId) {
  190. return deviceMapper.getDeviceByDeviceId(deviceId);
  191. }
  192. @Override
  193. public List<Device> getAllOnlineDevice() {
  194. return deviceMapper.getOnlineDevices();
  195. }
  196. @Override
  197. public boolean expire(Device device) {
  198. Date registerTimeDate;
  199. try {
  200. registerTimeDate = DateUtil.format.parse(device.getRegisterTime());
  201. } catch (ParseException e) {
  202. logger.error("设备时间格式化失败:{}->{} ", device.getDeviceId(), device.getRegisterTime() );
  203. return false;
  204. }
  205. int expires = device.getExpires();
  206. Calendar calendarForExpire = Calendar.getInstance();
  207. calendarForExpire.setTime(registerTimeDate);
  208. calendarForExpire.set(Calendar.SECOND, calendarForExpire.get(Calendar.SECOND) + expires);
  209. return calendarForExpire.before(DateUtil.getNow());
  210. }
  211. @Override
  212. public void checkDeviceStatus(Device device) {
  213. if (device == null || device.getOnline() == 0) {
  214. return;
  215. }
  216. sipCommander.deviceStatusQuery(device, null);
  217. }
  218. @Override
  219. public Device getDeviceByHostAndPort(String host, int port) {
  220. return deviceMapper.getDeviceByHostAndPort(host, port);
  221. }
  222. }