package com.narutohuo.xindazhou.ble.util import android.annotation.SuppressLint import android.bluetooth.BluetoothGatt import android.bluetooth.BluetoothGattCharacteristic import com.narutohuo.xindazhou.ble.model.FragmentBuffer import com.narutohuo.xindazhou.ble.model.Packet import com.narutohuo.xindazhou.ble.util.parseCommand import com.narutohuo.xindazhou.ble.util.parsePacket import com.narutohuo.xindazhou.ble.util.toHexString import org.json.JSONObject import com.narutohuo.xindazhou.core.log.ILog import kotlinx.coroutines.* import java.util.LinkedList import java.util.Queue import java.util.UUID import kotlin.random.Random /** * BLE数据包发送器 * * 负责 BLE 写入、超时、重试以及队列管理 */ class BlePacketSender( private val gatt: BluetoothGatt, private var writeChar: BluetoothGattCharacteristic, private val splitter: BLEPacketSplitter = BLEPacketSplitter(), private val maxPayloadPerPacket: Int = 20, private val ackTimeoutMs: Long = 1000L, private val maxRetry: Int = 3, private val onPairing: ((Int) -> Unit)? = null, private val onHandshakeRequest: ((Int) -> Unit)? = null ) { private val tag = "BlePacketSender" // 发送队列 private val pendingPackets: Queue = LinkedList() // 用来标记当前正在等待的包序号(-1 表示空闲) @Volatile private var waitingSeq = -1 // 分包接收缓存 private val fragmentCache = mutableMapOf() // 业务消息处理回调 private var onBusinessMessage: ((String, String, Map, String?) -> Unit)? = null /** * 把完整指令数据放入发送队列,随后自动开始发送 * * @param rawData 完整指令数据(不包含分包协议头尾) * @param businessId 业务标识(4字节) * @param isRequest true→请求,false→响应 */ fun enqueueAndStart( rawData: ByteArray, businessId: ByteArray, isRequest: Boolean = true ) { // 1️⃣ 分包 val packets = splitter.split(rawData, businessId, isRequest) // 2️⃣ 入队 pendingPackets.addAll(packets) // 3️⃣ 启动发送(如果当前没有正在发送的任务) if (waitingSeq == -1) { sendNextPacket() } } /** * 发送队列中的下一个包(如果有的话) */ private fun sendNextPacket() { val packet = pendingPackets.peek() ?: run { // 队列为空 → 发送完毕 onAllPacketsSent() return } // 取出包序号(第 7-8 个字节,大端序) waitingSeq = extractSeqFromPacket(packet) sendPacketWithRetry(packet, retryCount = 0) } /** * 实际写入并处理超时/重试 */ @OptIn(DelicateCoroutinesApi::class) @SuppressLint("MissingPermission") private fun sendPacketWithRetry(packet: ByteArray, retryCount: Int) { // 1️⃣ 写入(WRITE_TYPE_NO_RESPONSE) writeChar.writeType = BluetoothGattCharacteristic.WRITE_TYPE_NO_RESPONSE writeChar.value = packet val writeResult = gatt.writeCharacteristic(writeChar) ILog.d(tag, "发送数据: result=$writeResult, packet=${packet.toHexString()}") // 注意:即使 writeResult 为 false,也启动超时计时器 // 因为在 Android BLE 中,使用 WRITE_TYPE_NO_RESPONSE 时,writeCharacteristic 可能返回 false // 但写入操作可能在后续完成,并通过 onCharacteristicWrite 回调确认 // 如果 writeResult 为 false 且后续没有 onCharacteristicWrite 回调,超时后会重试 // 2️⃣ 启动超时计时 CoroutineScope(Dispatchers.Main).launch { try { withTimeout(ackTimeoutMs) { // 这里会被 onCharacteristicWrite 回调的 `waitingSeq` 置为 -1 打断 while (waitingSeq != -1) { delay(50) // 轮询等待 ACK } } // 超时未抛异常 → 收到 ACK onPacketSuccess() } catch (e: TimeoutCancellationException) { // 超时未收到 ACK handleWriteFailure(packet, retryCount, "ACK 超时") } } } /** * 处理一次写入失败或超时后的重试/跳过逻辑 */ private fun handleWriteFailure(packet: ByteArray, retryCount: Int, reason: String) { if (retryCount < maxRetry - 1) { // 还有重试次数 → 再次发送 ILog.w(tag, "包 seq=$waitingSeq 发送失败($reason),第 ${retryCount + 1} 次重试") sendPacketWithRetry(packet, retryCount + 1) } else { // 已经重试 maxRetry 次,直接放弃该包,进入下一个 ILog.e(tag, "包 seq=$waitingSeq 重试 $maxRetry 次仍失败,跳过该包") pendingPackets.poll() // 移除当前包 waitingSeq = -1 sendNextPacket() } } /** * 当前包发送成功后调用 */ private fun onPacketSuccess() { ILog.d(tag, "包 seq=$waitingSeq 发送成功") pendingPackets.poll() // 移除已成功的包 waitingSeq = -1 sendNextPacket() } /** * 所有队列中的包都已发送完毕后调用 */ private fun onAllPacketsSent() { ILog.d(tag, "全部分包发送完毕") } /** * 必须在外部的 BluetoothGattCallback 中调用此方法 * 当收到从设备的写响应(onCharacteristicWrite)时触发 */ fun onCharacteristicWrite(characteristic: BluetoothGattCharacteristic, status: Int) { ILog.d(tag, "onCharacteristicWrite: status=$status") if (characteristic.uuid != writeChar.uuid) return if (status == android.bluetooth.BluetoothGatt.GATT_SUCCESS) { ILog.d(tag, "设备返回写成功") // 从设备确认收到该包 → 结束等待 waitingSeq = -1 } else { ILog.w(tag, "设备返回写错误 status=$status,视为发送失败") waitingSeq = -1 } } /** * 必须在外部的 BluetoothGattCallback 中调用此方法 * 当收到从设备的通知(onCharacteristicChanged)时触发 */ @SuppressLint("MissingPermission") fun onCharacteristicNotify(characteristic: BluetoothGattCharacteristic) { val value = characteristic.value ?: return ILog.d(tag, "收到通知: ${value.toHexString()}") try { val packet = value.parsePacket() if (!packet.isValid()) { ILog.e(tag, "收到非法数据包,校验失败") return } if (packet.messageType() == 0x01.toByte()) { // 请求 // 1. 自动发送 ACK 响应 val ackPacket = packet.buildAckResponse() pendingPackets.add(ackPacket) if (waitingSeq == -1) sendNextPacket() // 2. 业务层分片缓存(去掉最低字节,只保留高 24 位) val cacheKey = packet.businessId and 0xFFFFFF00u val buffer = fragmentCache.getOrPut(cacheKey) { FragmentBuffer(totalPackets = packet.totalPackets.toInt()) } // 3. 保存当前分片 buffer.received[packet.packetSeq.toInt()] = packet.data // 4. 检查是否已收齐 if (buffer.isComplete()) { val fullData = buffer.assemble() try { val command = fullData.parseCommand() ILog.d(tag, "完整指令解析成功,数据长度=${command.data.size}") // 5. 处理业务消息(JSON格式) handleBusinessMessage(command.data) } catch (e: Exception) { ILog.e(tag, "指令解析失败", e) } // 清理缓存 fragmentCache.remove(cacheKey) } else { ILog.d(tag, "业务ID 0x${cacheKey.toString(16)} 已收到 ${buffer.received.size}/${buffer.totalPackets} 包") } } else if (packet.messageType() == 0x02.toByte()) { // 响应 // 处理响应包(如果需要) ILog.d(tag, "收到响应包,seq=${packet.packetSeq}") } } catch (e: Exception) { ILog.e(tag, "解析通知数据失败", e) } } /** * 处理业务消息(JSON格式) */ private fun handleBusinessMessage(data: ByteArray) { try { val jsonString = String(data, Charsets.UTF_8) ILog.d(tag, "收到业务消息: $jsonString") // 解析JSON val jsonObject = JSONObject(jsonString) val ty = jsonObject.optInt("TY", 0) val `in` = jsonObject.optInt("IN", 0) // 处理握手和配对 if (ty == 3 && `in` == 3) { // 握手请求 handleHandshakeRequest(jsonObject) } else if (ty == 3 && `in` == 4) { // 配对请求 handlePairingRequest(jsonObject) } // 通知业务层 val extras = mutableMapOf() jsonObject.keys().forEach { key -> extras[key] = jsonObject.optString(key, "") } onBusinessMessage?.invoke( "业务消息", jsonString, extras, jsonObject.optString("ID", null) ) } catch (e: Exception) { ILog.e(tag, "解析业务消息失败", e) } } /** * 处理握手请求 */ private fun handleHandshakeRequest(message: JSONObject) { try { val c = message.getJSONObject("C") val d = c.getJSONObject("D") val random = d.getInt("Random") ILog.d(tag, "收到握手请求,随机数: $random") // 通知业务层处理握手响应 onHandshakeRequest?.invoke(random) } catch (e: Exception) { ILog.e(tag, "处理握手请求失败", e) } } /** * 处理配对请求 */ private fun handlePairingRequest(message: JSONObject) { try { val c = message.getJSONObject("C") val d = c.getJSONObject("D") val pairing = d.getInt("Pairing") ILog.d(tag, "收到配对请求,配对状态: $pairing") // 通知业务层 onPairing?.invoke(pairing) } catch (e: Exception) { ILog.e(tag, "处理配对请求失败", e) } } /** * 设置业务消息监听器 */ fun setBusinessMessageListener(listener: (String, String, Map, String?) -> Unit) { this.onBusinessMessage = listener } /** * 从包中提取序号 */ private fun extractSeqFromPacket(packet: ByteArray): Int { if (packet.size < 9) return -1 val high = packet[7].toInt() and 0xFF val low = packet[8].toInt() and 0xFF return (high shl 8) or low } /** * 生成业务ID */ fun generateBusinessId(): ByteArray { val timestamp = System.currentTimeMillis() val timePart = timestamp and 0xFFFFL val randomPart: Int = Random.nextInt(0xFF + 1) val combined = ((timePart shl 8) or randomPart.toLong()).toInt() val idBytes = ByteArray(4) idBytes[0] = (combined shr 16).toByte() // 高8位 idBytes[1] = (combined shr 8).toByte() // 中8位 idBytes[2] = combined.toByte() // 低8位 idBytes[3] = 0x01.toByte() // 请求类型(会在split时设置) return idBytes } }