InviteStreamServiceImpl.java 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.genersoft.iot.vmp.common.InviteInfo;
  4. import com.genersoft.iot.vmp.common.InviteSessionStatus;
  5. import com.genersoft.iot.vmp.common.InviteSessionType;
  6. import com.genersoft.iot.vmp.common.VideoManagerConstants;
  7. import com.genersoft.iot.vmp.service.IInviteStreamService;
  8. import com.genersoft.iot.vmp.service.bean.ErrorCallback;
  9. import com.genersoft.iot.vmp.utils.redis.RedisUtil;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. import org.springframework.beans.factory.annotation.Autowired;
  13. import org.springframework.data.redis.core.RedisTemplate;
  14. import org.springframework.stereotype.Service;
  15. import java.util.List;
  16. import java.util.Map;
  17. import java.util.concurrent.ConcurrentHashMap;
  18. import java.util.concurrent.CopyOnWriteArrayList;
  19. @Service
  20. public class InviteStreamServiceImpl implements IInviteStreamService {
  21. private final Logger logger = LoggerFactory.getLogger(InviteStreamServiceImpl.class);
  22. private final Map<String, List<ErrorCallback<Object>>> inviteErrorCallbackMap = new ConcurrentHashMap<>();
  23. @Autowired
  24. private RedisTemplate<Object, Object> redisTemplate;
  25. @Override
  26. public void updateInviteInfo(InviteInfo inviteInfo) {
  27. if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) {
  28. logger.warn("[更新Invite信息],参数不全: {}", JSON.toJSON(inviteInfo));
  29. return;
  30. }
  31. InviteInfo inviteInfoForUpdate = null;
  32. if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
  33. if (inviteInfo.getDeviceId() == null
  34. || inviteInfo.getChannelId() == null
  35. || inviteInfo.getType() == null
  36. || inviteInfo.getStream() == null
  37. ) {
  38. return;
  39. }
  40. inviteInfoForUpdate = inviteInfo;
  41. } else {
  42. InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
  43. inviteInfo.getChannelId(), inviteInfo.getStream());
  44. if (inviteInfoInRedis == null) {
  45. logger.warn("[更新Invite信息],未从缓存中读取到Invite信息: deviceId: {}, channel: {}, stream: {}",
  46. inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  47. return;
  48. }
  49. if (inviteInfo.getStreamInfo() != null) {
  50. inviteInfoInRedis.setStreamInfo(inviteInfo.getStreamInfo());
  51. }
  52. if (inviteInfo.getSsrcInfo() != null) {
  53. inviteInfoInRedis.setSsrcInfo(inviteInfo.getSsrcInfo());
  54. }
  55. if (inviteInfo.getStreamMode() != null) {
  56. inviteInfoInRedis.setStreamMode(inviteInfo.getStreamMode());
  57. }
  58. if (inviteInfo.getReceiveIp() != null) {
  59. inviteInfoInRedis.setReceiveIp(inviteInfo.getReceiveIp());
  60. }
  61. if (inviteInfo.getReceivePort() != null) {
  62. inviteInfoInRedis.setReceivePort(inviteInfo.getReceivePort());
  63. }
  64. if (inviteInfo.getStatus() != null) {
  65. inviteInfoInRedis.setStatus(inviteInfo.getStatus());
  66. }
  67. inviteInfoForUpdate = inviteInfoInRedis;
  68. }
  69. String key = VideoManagerConstants.INVITE_PREFIX +
  70. "_" + inviteInfoForUpdate.getType() +
  71. "_" + inviteInfoForUpdate.getDeviceId() +
  72. "_" + inviteInfoForUpdate.getChannelId() +
  73. "_" + inviteInfoForUpdate.getStream();
  74. redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
  75. }
  76. @Override
  77. public InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream) {
  78. InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  79. if (inviteInfoInDb == null) {
  80. return null;
  81. }
  82. removeInviteInfo(inviteInfoInDb);
  83. String key = VideoManagerConstants.INVITE_PREFIX +
  84. "_" + inviteInfo.getType() +
  85. "_" + inviteInfo.getDeviceId() +
  86. "_" + inviteInfo.getChannelId() +
  87. "_" + stream;
  88. inviteInfoInDb.setStream(stream);
  89. redisTemplate.opsForValue().set(key, inviteInfoInDb);
  90. return inviteInfoInDb;
  91. }
  92. @Override
  93. public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
  94. String key = VideoManagerConstants.INVITE_PREFIX +
  95. "_" + (type != null ? type : "*") +
  96. "_" + (deviceId != null ? deviceId : "*") +
  97. "_" + (channelId != null ? channelId : "*") +
  98. "_" + (stream != null ? stream : "*");
  99. List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
  100. if (scanResult.size() != 1) {
  101. return null;
  102. }
  103. return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
  104. }
  105. @Override
  106. public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, String deviceId, String channelId) {
  107. return getInviteInfo(type, deviceId, channelId, null);
  108. }
  109. @Override
  110. public InviteInfo getInviteInfoByStream(InviteSessionType type, String stream) {
  111. return getInviteInfo(type, null, null, stream);
  112. }
  113. @Override
  114. public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
  115. String scanKey = VideoManagerConstants.INVITE_PREFIX +
  116. "_" + (type != null ? type : "*") +
  117. "_" + (deviceId != null ? deviceId : "*") +
  118. "_" + (channelId != null ? channelId : "*") +
  119. "_" + (stream != null ? stream : "*");
  120. List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
  121. if (scanResult.size() > 0) {
  122. for (Object keyObj : scanResult) {
  123. String key = (String) keyObj;
  124. InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key);
  125. if (inviteInfo == null) {
  126. continue;
  127. }
  128. redisTemplate.delete(key);
  129. inviteErrorCallbackMap.remove(buildKey(type, deviceId, channelId, inviteInfo.getStream()));
  130. }
  131. }
  132. }
  133. @Override
  134. public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId) {
  135. removeInviteInfo(inviteSessionType, deviceId, channelId, null);
  136. }
  137. @Override
  138. public void removeInviteInfo(InviteInfo inviteInfo) {
  139. removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  140. }
  141. @Override
  142. public void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback<Object> callback) {
  143. String key = buildKey(type, deviceId, channelId, stream);
  144. List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
  145. if (callbacks == null) {
  146. callbacks = new CopyOnWriteArrayList<>();
  147. inviteErrorCallbackMap.put(key, callbacks);
  148. }
  149. callbacks.add(callback);
  150. }
  151. private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) {
  152. String key = type + "_" + deviceId + "_" + channelId;
  153. // 如果ssrc未null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
  154. if (stream != null) {
  155. key += ("_" + stream);
  156. }
  157. return key;
  158. }
  159. @Override
  160. public void clearInviteInfo(String deviceId) {
  161. removeInviteInfo(null, deviceId, null, null);
  162. }
  163. @Override
  164. public int getStreamInfoCount(String mediaServerId) {
  165. int count = 0;
  166. String key = VideoManagerConstants.INVITE_PREFIX + "_*_*_*_*";
  167. List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
  168. if (scanResult.size() == 0) {
  169. return 0;
  170. }else {
  171. for (Object keyObj : scanResult) {
  172. String keyStr = (String) keyObj;
  173. InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(keyStr);
  174. if (inviteInfo != null && inviteInfo.getStreamInfo() != null && inviteInfo.getStreamInfo().getMediaServerId().equals(mediaServerId)) {
  175. count++;
  176. }
  177. }
  178. }
  179. return count;
  180. }
  181. @Override
  182. public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) {
  183. String key = buildSubStreamKey(type, deviceId, channelId, stream);
  184. List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
  185. if (callbacks == null) {
  186. return;
  187. }
  188. for (ErrorCallback<Object> callback : callbacks) {
  189. callback.run(code, msg, data);
  190. }
  191. inviteErrorCallbackMap.remove(key);
  192. }
  193. private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, String stream) {
  194. String key = type + "_" + "_" + deviceId + "_" + channelId;
  195. // 如果ssrc为null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
  196. if (stream != null) {
  197. key += ("_" + stream);
  198. }
  199. return key;
  200. }
  201. }