SocketIOManager.kt 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. package com.narutohuo.xindazhou.socketio
  2. import android.app.Application
  3. import androidx.lifecycle.DefaultLifecycleObserver
  4. import androidx.lifecycle.LifecycleOwner
  5. import androidx.lifecycle.ProcessLifecycleOwner
  6. import com.narutohuo.xindazhou.core.log.ILog
  7. import com.narutohuo.xindazhou.socketio.factory.SocketIORepositoryFactory
  8. import com.narutohuo.xindazhou.socketio.SocketIOKit
  9. import com.narutohuo.xindazhou.socketio.model.SocketIOResponse
  10. import kotlinx.coroutines.CoroutineScope
  11. import kotlinx.coroutines.Dispatchers
  12. import kotlinx.coroutines.SupervisorJob
  13. import kotlinx.coroutines.flow.MutableSharedFlow
  14. import kotlinx.coroutines.flow.SharedFlow
  15. import kotlinx.coroutines.flow.asSharedFlow
  16. import kotlinx.coroutines.launch
  17. /**
  18. * SocketIO 管理器(高级封装)
  19. *
  20. * 提供统一的 Socket.IO 封装,自动处理连接、重连、Token 刷新等
  21. * 外部只需要订阅即可,无需关心连接细节
  22. *
  23. * 职责:
  24. * ✅ 自动处理连接(包括 Token 刷新)
  25. * ✅ 自动处理重连
  26. * ✅ 提供消息订阅接口(SharedFlow)
  27. * ✅ 外部只需要订阅即可
  28. *
  29. * 使用方式(完全懒加载,自动处理连接):
  30. * ```kotlin
  31. * // 在 AppInitializer 中设置回调
  32. * SocketIOManager.isLoggedInProvider = { AuthManager.isLoggedIn() }
  33. * SocketIOManager.refreshTokenProvider = { AuthManager.refreshTokenIfNeeded() }
  34. *
  35. * // 在 ViewModel 中直接订阅消息(subscribe() 会自动处理连接,无需手动调用 ensureConnected())
  36. * viewModelScope.launch {
  37. * SocketIOManager.subscribe("community_message").collect { response ->
  38. * // 处理社区消息
  39. * }
  40. * }
  41. * ```
  42. */
  43. object SocketIOManager : DefaultLifecycleObserver {
  44. private const val TAG = "SocketIOManager"
  45. private val repository = SocketIORepositoryFactory.getInstance()
  46. private val socketService = SocketIOKit.getInstance()
  47. private val appScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
  48. private var isInitialized = false
  49. // 登录状态检查回调(app 层设置)
  50. var isLoggedInProvider: (() -> Boolean)? = null
  51. // Token 刷新回调(app 层设置)
  52. var refreshTokenProvider: (suspend () -> String?)? = null
  53. // 连接状态(使用 replay = 1 确保新订阅者能立即获取最新状态)
  54. private val _connectionState = MutableSharedFlow<Boolean>(extraBufferCapacity = 1, replay = 1)
  55. val connectionState: SharedFlow<Boolean> = _connectionState.asSharedFlow()
  56. // 消息订阅(使用 SharedFlow)
  57. private val eventFlows = mutableMapOf<String, MutableSharedFlow<SocketIOResponse>>()
  58. init {
  59. // 初始化连接状态(同步获取当前状态)
  60. _connectionState.tryEmit(isConnected())
  61. // 订阅连接状态变化
  62. observeConnectionState()
  63. }
  64. /**
  65. * 确保已初始化(懒加载,第一次使用时自动初始化)
  66. *
  67. * 在所有需要使用 SocketIO 功能的方法中调用
  68. */
  69. private fun ensureInitialized() {
  70. if (isInitialized) {
  71. return
  72. }
  73. synchronized(this) {
  74. if (isInitialized) {
  75. return
  76. }
  77. ILog.d(TAG, "SocketIOManager 懒加载初始化...")
  78. // 设置回调到底层 Manager
  79. com.narutohuo.xindazhou.socketio.manager.SocketIOManager.setCallbacks(
  80. isLoggedInProvider = isLoggedInProvider,
  81. refreshTokenProvider = refreshTokenProvider
  82. )
  83. // 注册 App 生命周期监听
  84. try {
  85. ProcessLifecycleOwner.get().lifecycle.addObserver(this)
  86. ILog.d(TAG, "生命周期监听已注册")
  87. } catch (e: Exception) {
  88. ILog.w(TAG, "注册生命周期监听失败: ${e.message}")
  89. }
  90. isInitialized = true
  91. ILog.d(TAG, "SocketIOManager 懒加载初始化完成")
  92. // 如果设置了回调且用户已登录,自动连接
  93. isLoggedInProvider?.let { provider ->
  94. appScope.launch {
  95. if (provider.invoke()) {
  96. ILog.d(TAG, "用户已登录,自动连接 Socket.IO...")
  97. ensureConnected()
  98. }
  99. }
  100. }
  101. }
  102. }
  103. /**
  104. * 确保已连接(如果未连接则自动连接)
  105. *
  106. * 在用户登录成功后调用,确保 Socket.IO 已连接
  107. * 如果已连接,则不做任何操作
  108. * 如果未连接,会自动处理连接(包括 Token 刷新)
  109. */
  110. fun ensureConnected() {
  111. ensureInitialized() // 懒加载初始化
  112. appScope.launch {
  113. // 检查是否已登录
  114. if (isLoggedInProvider?.invoke() != true) {
  115. ILog.d(TAG, "用户未登录,无法连接")
  116. _connectionState.emit(false)
  117. return@launch
  118. }
  119. // 如果已连接,更新状态并返回
  120. if (isConnected()) {
  121. ILog.d(TAG, "已连接,状态正常")
  122. _connectionState.emit(true)
  123. return@launch
  124. }
  125. // 未连接,尝试重连
  126. ILog.d(TAG, "检测到未连接,开始重连...")
  127. val result = repository.checkAndReconnect(null, null)
  128. if (result) {
  129. _connectionState.emit(true)
  130. } else {
  131. _connectionState.emit(false)
  132. }
  133. }
  134. }
  135. /**
  136. * 断开连接
  137. */
  138. fun disconnect() {
  139. ILog.d(TAG, "断开 Socket.IO 连接")
  140. appScope.launch {
  141. repository.disconnect()
  142. _connectionState.emit(false)
  143. }
  144. }
  145. /**
  146. * 检查连接状态
  147. */
  148. fun isConnected(): Boolean {
  149. return repository.isConnected()
  150. }
  151. /**
  152. * 订阅消息(返回 SharedFlow)
  153. *
  154. * 各个业务模块使用此方法订阅需要的消息类型
  155. *
  156. * 注意:
  157. * - 如果未连接,会自动尝试连接
  158. * - 如果用户未登录,会等待登录成功后连接
  159. *
  160. * 示例:
  161. * ```kotlin
  162. * // CommunityViewModel.kt
  163. * viewModelScope.launch {
  164. * SocketIOManager.subscribe("community_message").collect { response ->
  165. * // 处理社区消息
  166. * }
  167. * }
  168. * ```
  169. */
  170. fun subscribe(eventName: String): SharedFlow<SocketIOResponse> {
  171. ensureInitialized() // 懒加载初始化
  172. return eventFlows.getOrPut(eventName) {
  173. MutableSharedFlow<SocketIOResponse>(extraBufferCapacity = 1).also { flow ->
  174. // 订阅 Socket.IO 事件
  175. socketService.on(eventName) { response ->
  176. appScope.launch {
  177. flow.emit(response)
  178. }
  179. }
  180. // 如果未连接,尝试连接(使用 ensureConnected 统一处理)
  181. if (!isConnected()) {
  182. ensureConnected()
  183. }
  184. }
  185. }.asSharedFlow()
  186. }
  187. /**
  188. * 发送消息
  189. */
  190. fun emit(eventName: String, data: Any) {
  191. ensureInitialized() // 懒加载初始化
  192. if (!isConnected()) {
  193. ILog.w(TAG, "Socket.IO 未连接,无法发送事件: $eventName,尝试重新连接...")
  194. appScope.launch {
  195. ensureConnected()
  196. }
  197. return
  198. }
  199. socketService.emit(eventName, data)
  200. }
  201. // MARK: - 生命周期监听
  202. /**
  203. * App 进入前台时调用
  204. *
  205. * 检查 SocketIO 连接状态,如果断开则自动重连
  206. * 确保应用回到前台时,Socket.IO 连接状态正常
  207. */
  208. override fun onStart(owner: LifecycleOwner) {
  209. ILog.d(TAG, "App 进入前台,检查 SocketIO 连接状态")
  210. // 使用 ensureConnected() 统一处理连接逻辑
  211. ensureConnected()
  212. }
  213. /**
  214. * App 进入后台时调用
  215. *
  216. * 注意:通常不断开 SocketIO 连接,因为:
  217. * 1. SocketIO 连接是轻量级的
  218. * 2. 用户可能需要在后台接收消息
  219. * 3. 系统会在内存不足时自动清理
  220. */
  221. override fun onStop(owner: LifecycleOwner) {
  222. ILog.d(TAG, "App 进入后台")
  223. // 不断开连接,保持连接以便接收消息
  224. }
  225. // MARK: - Private Methods
  226. /**
  227. * 观察连接状态变化
  228. */
  229. private fun observeConnectionState() {
  230. appScope.launch {
  231. repository.connectionState.collect { isConnected ->
  232. _connectionState.emit(isConnected)
  233. }
  234. }
  235. }
  236. }