| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- package com.narutohuo.xindazhou.socketio
- import android.app.Application
- import androidx.lifecycle.DefaultLifecycleObserver
- import androidx.lifecycle.LifecycleOwner
- import androidx.lifecycle.ProcessLifecycleOwner
- import com.narutohuo.xindazhou.core.log.ILog
- import com.narutohuo.xindazhou.socketio.factory.SocketIORepositoryFactory
- import com.narutohuo.xindazhou.socketio.SocketIOKit
- import com.narutohuo.xindazhou.socketio.model.SocketIOResponse
- import kotlinx.coroutines.CoroutineScope
- import kotlinx.coroutines.Dispatchers
- import kotlinx.coroutines.SupervisorJob
- import kotlinx.coroutines.flow.MutableSharedFlow
- import kotlinx.coroutines.flow.SharedFlow
- import kotlinx.coroutines.flow.asSharedFlow
- import kotlinx.coroutines.launch
- /**
- * SocketIO 管理器(高级封装)
- *
- * 提供统一的 Socket.IO 封装,自动处理连接、重连、Token 刷新等
- * 外部只需要订阅即可,无需关心连接细节
- *
- * 职责:
- * ✅ 自动处理连接(包括 Token 刷新)
- * ✅ 自动处理重连
- * ✅ 提供消息订阅接口(SharedFlow)
- * ✅ 外部只需要订阅即可
- *
- * 使用方式(完全懒加载,自动处理连接):
- * ```kotlin
- * // 在 AppInitializer 中设置回调
- * SocketIOManager.isLoggedInProvider = { AuthManager.isLoggedIn() }
- * SocketIOManager.refreshTokenProvider = { AuthManager.refreshTokenIfNeeded() }
- *
- * // 在 ViewModel 中直接订阅消息(subscribe() 会自动处理连接,无需手动调用 ensureConnected())
- * viewModelScope.launch {
- * SocketIOManager.subscribe("community_message").collect { response ->
- * // 处理社区消息
- * }
- * }
- * ```
- */
- object SocketIOManager : DefaultLifecycleObserver {
-
- private const val TAG = "SocketIOManager"
-
- private val repository = SocketIORepositoryFactory.getInstance()
- private val socketService = SocketIOKit.getInstance()
- private val appScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
-
- private var isInitialized = false
-
- // 登录状态检查回调(app 层设置)
- var isLoggedInProvider: (() -> Boolean)? = null
-
- // Token 刷新回调(app 层设置)
- var refreshTokenProvider: (suspend () -> String?)? = null
-
- // 连接状态(使用 replay = 1 确保新订阅者能立即获取最新状态)
- private val _connectionState = MutableSharedFlow<Boolean>(extraBufferCapacity = 1, replay = 1)
- val connectionState: SharedFlow<Boolean> = _connectionState.asSharedFlow()
-
- // 消息订阅(使用 SharedFlow)
- private val eventFlows = mutableMapOf<String, MutableSharedFlow<SocketIOResponse>>()
-
- init {
- // 初始化连接状态(同步获取当前状态)
- _connectionState.tryEmit(isConnected())
-
- // 订阅连接状态变化
- observeConnectionState()
- }
-
- /**
- * 确保已初始化(懒加载,第一次使用时自动初始化)
- *
- * 在所有需要使用 SocketIO 功能的方法中调用
- */
- private fun ensureInitialized() {
- if (isInitialized) {
- return
- }
-
- synchronized(this) {
- if (isInitialized) {
- return
- }
-
- ILog.d(TAG, "SocketIOManager 懒加载初始化...")
-
- // 设置回调到底层 Manager
- com.narutohuo.xindazhou.socketio.manager.SocketIOManager.setCallbacks(
- isLoggedInProvider = isLoggedInProvider,
- refreshTokenProvider = refreshTokenProvider
- )
-
- // 注册 App 生命周期监听
- try {
- ProcessLifecycleOwner.get().lifecycle.addObserver(this)
- ILog.d(TAG, "生命周期监听已注册")
- } catch (e: Exception) {
- ILog.w(TAG, "注册生命周期监听失败: ${e.message}")
- }
-
- isInitialized = true
- ILog.d(TAG, "SocketIOManager 懒加载初始化完成")
-
- // 如果设置了回调且用户已登录,自动连接
- isLoggedInProvider?.let { provider ->
- appScope.launch {
- if (provider.invoke()) {
- ILog.d(TAG, "用户已登录,自动连接 Socket.IO...")
- ensureConnected()
- }
- }
- }
- }
- }
-
- /**
- * 确保已连接(如果未连接则自动连接)
- *
- * 在用户登录成功后调用,确保 Socket.IO 已连接
- * 如果已连接,则不做任何操作
- * 如果未连接,会自动处理连接(包括 Token 刷新)
- */
- fun ensureConnected() {
- ensureInitialized() // 懒加载初始化
-
- appScope.launch {
- // 检查是否已登录
- if (isLoggedInProvider?.invoke() != true) {
- ILog.d(TAG, "用户未登录,无法连接")
- _connectionState.emit(false)
- return@launch
- }
-
- // 如果已连接,更新状态并返回
- if (isConnected()) {
- ILog.d(TAG, "已连接,状态正常")
- _connectionState.emit(true)
- return@launch
- }
-
- // 未连接,尝试重连
- ILog.d(TAG, "检测到未连接,开始重连...")
- val result = repository.checkAndReconnect(null, null)
- if (result) {
- _connectionState.emit(true)
- } else {
- _connectionState.emit(false)
- }
- }
- }
-
- /**
- * 断开连接
- */
- fun disconnect() {
- ILog.d(TAG, "断开 Socket.IO 连接")
- appScope.launch {
- repository.disconnect()
- _connectionState.emit(false)
- }
- }
-
- /**
- * 检查连接状态
- */
- fun isConnected(): Boolean {
- return repository.isConnected()
- }
-
- /**
- * 订阅消息(返回 SharedFlow)
- *
- * 各个业务模块使用此方法订阅需要的消息类型
- *
- * 注意:
- * - 如果未连接,会自动尝试连接
- * - 如果用户未登录,会等待登录成功后连接
- *
- * 示例:
- * ```kotlin
- * // CommunityViewModel.kt
- * viewModelScope.launch {
- * SocketIOManager.subscribe("community_message").collect { response ->
- * // 处理社区消息
- * }
- * }
- * ```
- */
- fun subscribe(eventName: String): SharedFlow<SocketIOResponse> {
- ensureInitialized() // 懒加载初始化
-
- return eventFlows.getOrPut(eventName) {
- MutableSharedFlow<SocketIOResponse>(extraBufferCapacity = 1).also { flow ->
- // 订阅 Socket.IO 事件
- socketService.on(eventName) { response ->
- appScope.launch {
- flow.emit(response)
- }
- }
-
- // 如果未连接,尝试连接(使用 ensureConnected 统一处理)
- if (!isConnected()) {
- ensureConnected()
- }
- }
- }.asSharedFlow()
- }
-
- /**
- * 发送消息
- */
- fun emit(eventName: String, data: Any) {
- ensureInitialized() // 懒加载初始化
-
- if (!isConnected()) {
- ILog.w(TAG, "Socket.IO 未连接,无法发送事件: $eventName,尝试重新连接...")
- appScope.launch {
- ensureConnected()
- }
- return
- }
-
- socketService.emit(eventName, data)
- }
-
- // MARK: - 生命周期监听
-
- /**
- * App 进入前台时调用
- *
- * 检查 SocketIO 连接状态,如果断开则自动重连
- * 确保应用回到前台时,Socket.IO 连接状态正常
- */
- override fun onStart(owner: LifecycleOwner) {
- ILog.d(TAG, "App 进入前台,检查 SocketIO 连接状态")
-
- // 使用 ensureConnected() 统一处理连接逻辑
- ensureConnected()
- }
-
- /**
- * App 进入后台时调用
- *
- * 注意:通常不断开 SocketIO 连接,因为:
- * 1. SocketIO 连接是轻量级的
- * 2. 用户可能需要在后台接收消息
- * 3. 系统会在内存不足时自动清理
- */
- override fun onStop(owner: LifecycleOwner) {
- ILog.d(TAG, "App 进入后台")
- // 不断开连接,保持连接以便接收消息
- }
-
- // MARK: - Private Methods
-
- /**
- * 观察连接状态变化
- */
- private fun observeConnectionState() {
- appScope.launch {
- repository.connectionState.collect { isConnected ->
- _connectionState.emit(isConnected)
- }
- }
- }
- }
|