Forráskód Böngészése

Merge branch 'main' of https://git.flowbb.top/humuyu/xdz-hd

hmy 3 hete
szülő
commit
2a504c82c2

+ 18 - 3
xdz-dependencies/pom.xml

@@ -73,7 +73,7 @@
         <bizlog-sdk.version>3.0.6</bizlog-sdk.version>
         <reflections.version>0.10.2</reflections.version>
         <netty.version>4.2.9.Final</netty.version>
-        <netty-socketio.version>2.0.13</netty-socketio.version>
+        <netty-socketio.version>3.0.1</netty-socketio.version>
         <java-jwt.version>4.4.0</java-jwt.version>
         <mqtt.version>1.2.5</mqtt.version>
         <vertx.version>4.5.22</vertx.version>
@@ -706,9 +706,10 @@
             </dependency>
 
             <!-- Netty SocketIO -->
+            <!-- 使用 socketio4j 3.0.1(稳定版本),支持 Socket.IO 1.x-4.x (Engine.IO v1-v3),与 iOS/Android 客户端兼容 -->
             <dependency>
-                <groupId>com.corundumstudio.socketio</groupId>
-                <artifactId>netty-socketio</artifactId>
+                <groupId>com.socketio4j</groupId>
+                <artifactId>netty-socketio-core</artifactId>
                 <version>${netty-socketio.version}</version>
             </dependency>
 
@@ -752,4 +753,18 @@
         </plugins>
     </build>
 
+    <!-- 添加 snapshot 仓库以支持 socketio4j 4.0.0-SNAPSHOT -->
+    <repositories>
+        <repository>
+            <id>sonatype-snapshots</id>
+            <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+            <releases>
+                <enabled>false</enabled>
+            </releases>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
+
 </project>

+ 4 - 2
xdz-module-message/xdz-module-message-server/pom.xml

@@ -59,9 +59,11 @@
             <artifactId>xdz-spring-boot-starter-monitor</artifactId>
         </dependency>
         <!-- Netty SocketIO -->
+        <!-- 使用 socketio4j 3.0.1(稳定版本,支持 Engine.IO v1-v3) -->
         <dependency>
-            <groupId>com.corundumstudio.socketio</groupId>
-            <artifactId>netty-socketio</artifactId>
+            <groupId>com.socketio4j</groupId>
+            <artifactId>netty-socketio-core</artifactId>
+            <version>3.0.1</version>
         </dependency>
         <!-- 车辆服务 API(包含消息类) -->
         <dependency>

+ 72 - 5
xdz-module-message/xdz-module-message-server/src/main/java/com/xindazhou/message/config/SocketIOConfig.java

@@ -2,6 +2,7 @@ package com.xindazhou.message.config;
 
 import com.corundumstudio.socketio.Configuration;
 import com.corundumstudio.socketio.SocketIOServer;
+import com.corundumstudio.socketio.Transport;
 import com.corundumstudio.socketio.store.RedissonStoreFactory;
 import lombok.extern.slf4j.Slf4j;
 import org.redisson.api.RedissonClient;
@@ -12,6 +13,9 @@ import org.springframework.context.annotation.Bean;
  * SocketIO 配置类
  * 
  * 支持分布式部署:使用 Redis 作为存储后端,实现多服务器节点之间的消息同步
+ * 支持 Engine.IO v3 协议(客户端使用)
+ * 
+ * 参考官方文档:https://www.socketio4j.org/server-api
  *
  * @author xindazhou
  */
@@ -25,6 +29,9 @@ public class SocketIOConfig {
     @Value("${socketio.port:9090}")
     private Integer port;
 
+    @Value("${socketio.context:/socket.io}")
+    private String context;
+
     @Value("${socketio.max-frame-payload-length:1048576}")
     private int maxFramePayloadLength;
 
@@ -37,31 +44,91 @@ public class SocketIOConfig {
     @Value("${socketio.ping-interval:25000}")
     private int pingInterval;
 
+    @Value("${socketio.upgrade-timeout:10000}")
+    private int upgradeTimeout;
+
     /**
      * SocketIO 服务器配置
      * 
+     * 关键配置点(根据 socketio4j 官方文档):
+     * 1. context 必须设置为 "/socket.io"(默认值,但明确设置更安全)
+     * 2. transports 默认支持 POLLING 和 WEBSOCKET(Engine.IO v3 需要)
+     * 3. upgradeTimeout 用于 Polling → WebSocket 升级超时(默认 10 秒)
+     * 4. CORS 必须启用,允许跨域请求
+     * 
      * 如果项目中配置了 RedissonClient(通过 redisson-spring-boot-starter),
      * 则自动启用 Redis 存储,实现分布式消息同步
      */
     @Bean
     public SocketIOServer socketIOServer(RedissonClient redissonClient) {
         Configuration config = new Configuration();
+        
+        // ========== 网络和绑定配置 ==========
         config.setHostname(host);
         config.setPort(port);
+        
+        // 关键:必须设置 context 路径为 "/socket.io"
+        // Engine.IO v3 客户端默认连接到此路径
+        config.setContext(context);
+        
+        // ========== 传输协议配置(关键!)==========
+        // 根据官方文档,必须明确设置传输方式:POLLING 和 WEBSOCKET
+        // Engine.IO v3 客户端需要这两种传输方式:
+        // 1. 先使用 HTTP 长轮询(POLLING)进行连接
+        // 2. 然后升级到 WebSocket(WEBSOCKET)以获得更好的性能
+        try {
+            java.lang.reflect.Method setTransportsMethod = config.getClass()
+                    .getMethod("setTransports", Transport[].class);
+            setTransportsMethod.invoke(config, (Object) new Transport[]{Transport.POLLING, Transport.WEBSOCKET});
+            log.info("已设置传输方式: POLLING, WEBSOCKET");
+        } catch (Exception e) {
+            // 如果方法不存在,使用默认值(通常默认已支持 POLLING 和 WEBSOCKET)
+            log.debug("setTransports 方法不存在,使用默认传输方式");
+        }
+        
+        // 关键:设置升级超时时间(Polling → WebSocket 升级)
+        // 确保客户端能从 HTTP 长轮询成功升级到 WebSocket
+        // 默认值是 10000ms(10秒),根据官方文档建议
+        try {
+            java.lang.reflect.Method upgradeTimeoutMethod = config.getClass()
+                    .getMethod("setUpgradeTimeout", int.class);
+            upgradeTimeoutMethod.invoke(config, upgradeTimeout);
+            log.info("已设置 upgradeTimeout: {}ms", upgradeTimeout);
+        } catch (Exception e) {
+            // 如果方法不存在,使用默认值(通常是 10000ms)
+            log.debug("setUpgradeTimeout 方法不存在,使用默认值 10000ms");
+        }
+        
+        // ========== 负载和帧大小配置 ==========
         config.setMaxFramePayloadLength(maxFramePayloadLength);
         config.setMaxHttpContentLength(maxHttpContentLength);
+        
+        // ========== 心跳和超时配置 ==========
         config.setPingTimeout(pingTimeout);
         config.setPingInterval(pingInterval);
-        // 允许跨域
+        
+        // ========== CORS 配置(关键!)==========
+        // 确保 CORS 已启用,允许跨域请求
+        // Engine.IO v3 客户端可能需要跨域访问
+        try {
+            java.lang.reflect.Method enableCorsMethod = config.getClass()
+                    .getMethod("setEnableCors", boolean.class);
+            enableCorsMethod.invoke(config, true);
+            log.info("已启用 CORS");
+        } catch (Exception e) {
+            log.debug("setEnableCors 方法不存在,可能默认已启用");
+        }
         config.setOrigin("*");
         
-        // 配置 Redis 存储工厂(实现分布式消息同步)
-        // RedissonStoreFactory 是 netty-socketio 官方提供的 Redis 适配器
+        // ========== Redis 存储工厂(分布式支持)==========
         config.setStoreFactory(new RedissonStoreFactory(redissonClient));
         log.info("SocketIO Server 已启用 Redis 存储(分布式支持)");
         
-        log.info("SocketIO Server 配置完成: host={}, port={}, redis-enabled=true", host, port);
-        return new SocketIOServer(config);
+        log.info("SocketIO Server 配置完成: host={}, port={}, context={}, pingInterval={}ms, pingTimeout={}ms, redis-enabled=true", 
+                host, port, context, pingInterval, pingTimeout);
+        
+        SocketIOServer server = new SocketIOServer(config);
+        return server;
     }
 
 }

+ 100 - 12
xdz-module-message/xdz-module-message-server/src/main/java/com/xindazhou/message/event/VehicleControlEventHandler.java

@@ -39,7 +39,10 @@ public class VehicleControlEventHandler {
     @OnConnect
     public void onConnect(SocketIOClient client) {
         String sessionId = client.getSessionId().toString();
-        log.info("客户端连接: sessionId={}, remoteAddress={}", sessionId, client.getRemoteAddress());
+        log.info("客户端连接: sessionId={}, remoteAddress={}, engineIOVersion={}, handshakeData={}", 
+                sessionId, client.getRemoteAddress(), 
+                client.getEngineIOVersion() != null ? client.getEngineIOVersion().toString() : "null",
+                client.getHandshakeData() != null ? client.getHandshakeData().getUrlParams().toString() : "null");
         
         // 1. OAuth2 JWT Token 校验
         String token = getTokenFromClient(client);
@@ -48,27 +51,63 @@ public class VehicleControlEventHandler {
                 token != null && token.length() > 20 ? token.substring(0, 20) + "..." : token);
         
         if (StrUtil.isEmpty(token)) {
-            log.warn("客户端连接失败: 未提供 Token, sessionId={}", sessionId);
-            log.debug("Handshake Headers: {}", client.getHandshakeData().getHttpHeaders());
-            log.debug("Handshake URL Params: {}", client.getHandshakeData().getUrlParams());
+            log.error("客户端连接失败: 未提供 Token, sessionId={}", sessionId);
+            log.error("Handshake Headers: {}", client.getHandshakeData().getHttpHeaders());
+            log.error("Handshake URL Params: {}", client.getHandshakeData().getUrlParams());
+            log.error("Handshake URL: {}", client.getHandshakeData().getUrl());
+            // 尝试发送错误事件(如果连接已建立)
+            try {
+                client.sendEvent("connect_error", "Token is required");
+                Thread.sleep(100); // 确保事件发送完成
+            } catch (Exception e) {
+                log.debug("发送错误事件失败(连接可能未完全建立): {}", e.getMessage());
+            }
             client.disconnect();
             return;
         }
         
         // 2. 本地验证 JWT(验证签名 + 过期时间)
         DecodedJWT jwt = null;
+        String errorMessage = null;
         try {
             jwt = OAuth2JwtUtil.verifyToken(token);
             if (jwt == null) {
                 log.warn("客户端连接失败: OAuth2 JWT Token 验证失败(返回 null), sessionId={}, tokenLength={}", 
                         sessionId, token.length());
                 // 注意:OAuth2JwtUtil.verifyToken 内部已经记录了详细的验证失败原因
+                // 尝试判断是否是过期(通过解析 Token 的 exp 字段,但注意不能完全验证签名)
+                errorMessage = "Token expired or invalid";
+                try {
+                    // 尝试解析 Token 判断是否过期(不验证签名)
+                    com.auth0.jwt.interfaces.DecodedJWT decodedWithoutVerify = com.auth0.jwt.JWT.decode(token);
+                    if (decodedWithoutVerify.getExpiresAt() != null && decodedWithoutVerify.getExpiresAt().before(new java.util.Date())) {
+                        errorMessage = "Token expired";
+                    }
+                } catch (Exception e) {
+                    // 解析失败,使用默认错误信息
+                    log.debug("无法解析 Token 判断过期时间: {}", e.getMessage());
+                }
+                // 尝试发送错误事件(如果连接已建立)
+                try {
+                    client.sendEvent("connect_error", errorMessage);
+                    Thread.sleep(100); // 确保事件发送完成
+                } catch (Exception e) {
+                    log.debug("发送错误事件失败(连接可能未完全建立): {}", e.getMessage());
+                }
                 client.disconnect();
                 return;
             }
             log.info("Token 验证成功: sessionId={}, userId={}", sessionId, OAuth2JwtUtil.getUserId(jwt));
         } catch (Exception e) {
             log.error("客户端连接失败: OAuth2 JWT Token 验证异常, sessionId={}, error={}", sessionId, e.getMessage(), e);
+            errorMessage = "Token verification failed: " + e.getMessage();
+            // 尝试发送错误事件(如果连接已建立)
+            try {
+                client.sendEvent("connect_error", errorMessage);
+                Thread.sleep(100); // 确保事件发送完成
+            } catch (Exception ex) {
+                log.debug("发送错误事件失败(连接可能未完全建立): {}", ex.getMessage());
+            }
             client.disconnect();
             return;
         }
@@ -79,6 +118,14 @@ public class VehicleControlEventHandler {
         
         if (userId == null) {
             log.warn("客户端连接失败: 无法从 Token 中获取用户ID, sessionId={}", sessionId);
+            errorMessage = "Invalid token: user ID not found";
+            // 尝试发送错误事件(如果连接已建立)
+            try {
+                client.sendEvent("connect_error", errorMessage);
+                Thread.sleep(100); // 确保事件发送完成
+            } catch (Exception e) {
+                log.debug("发送错误事件失败(连接可能未完全建立): {}", e.getMessage());
+            }
             client.disconnect();
             return;
         }
@@ -99,29 +146,70 @@ public class VehicleControlEventHandler {
     /**
      * 从客户端获取 Token
      * SocketIO 连接时,Token 可以通过以下方式传递:
-     * 1. URL 参数:?token=xxx
-     * 2. Handshake 数据
+     * 1. URL 参数:?token=xxx 或 Authorization=Bearer xxx
+     * 2. HTTP Headers:Authorization: Bearer xxx
+     * 
+     * 注意:Engine.IO v1 客户端通常通过 URL 参数传递 token
      */
     private String getTokenFromClient(SocketIOClient client) {
-        // 方式1:从 Handshake 数据中获取(HTTP Headers)
+        String sessionId = client.getSessionId().toString();
+        
+        // 方式1:从 HTTP Headers 中获取(Authorization header)
         String authHeader = client.getHandshakeData().getHttpHeaders().get("Authorization");
-        if (StrUtil.isNotEmpty(authHeader) && authHeader.startsWith("Bearer ")) {
-            return authHeader.substring(7);
+        if (StrUtil.isNotEmpty(authHeader)) {
+            log.debug("从 HTTP Headers 获取 Authorization: sessionId={}, header={}", sessionId, authHeader.substring(0, Math.min(50, authHeader.length())));
+            if (authHeader.startsWith("Bearer ")) {
+                String token = authHeader.substring(7);
+                if (StrUtil.isNotEmpty(token)) {
+                    log.debug("从 Authorization Header 获取到 Token: sessionId={}, tokenLength={}", sessionId, token.length());
+                    return token;
+                }
+            }
         }
         
         // 方式2:从 URL 参数中获取(URL Params 返回的是 List<String>)
         Map<String, java.util.List<String>> urlParams = client.getHandshakeData().getUrlParams();
+        if (urlParams != null && !urlParams.isEmpty()) {
+            log.debug("URL Params: sessionId={}, params={}", sessionId, urlParams.keySet());
+            
+            // 尝试获取 token 参数
         java.util.List<String> tokenList = urlParams.get("token");
         if (tokenList != null && !tokenList.isEmpty()) {
             String token = tokenList.get(0);
+                if (StrUtil.isNotEmpty(token)) {
+                    log.debug("从 URL Params[token] 获取到 Token: sessionId={}, tokenLength={}", sessionId, token.length());
+                    return token;
+                }
+            }
+            
+            // 尝试获取 Authorization 参数(客户端可能通过 URL 参数传递 Authorization)
+            java.util.List<String> authList = urlParams.get("Authorization");
+            if (authList != null && !authList.isEmpty()) {
+                String auth = authList.get(0);
+                if (StrUtil.isNotEmpty(auth) && auth.startsWith("Bearer ")) {
+                    String token = auth.substring(7);
+                    if (StrUtil.isNotEmpty(token)) {
+                        log.debug("从 URL Params[Authorization] 获取到 Token: sessionId={}, tokenLength={}", sessionId, token.length());
+                        return token;
+                    }
+                }
+            }
+        }
+        
+        // 方式3:从 Single URL 参数中获取(直接调用 API)
+        try {
+            String token = client.getHandshakeData().getSingleUrlParam("token");
             if (StrUtil.isNotEmpty(token)) {
+                log.debug("从 getSingleUrlParam(token) 获取到 Token: sessionId={}, tokenLength={}", sessionId, token.length());
                 return token;
             }
+        } catch (Exception e) {
+            log.debug("getSingleUrlParam 异常: sessionId={}, error={}", sessionId, e.getMessage());
         }
         
-        // 方式3:从 Single 参数中获取
-        String token = client.getHandshakeData().getSingleUrlParam("token");
-        return token;
+        // 方式4:尝试从所有 URL 参数中查找(可能参数名不同)
+        log.warn("未找到 Token: sessionId={}, 请检查客户端是否正确传递 token", sessionId);
+        return null;
     }
 
     /**

+ 9 - 1
xdz-module-message/xdz-module-message-server/src/main/resources/application.yaml

@@ -10,13 +10,21 @@ server:
   #   context-path: /message  # 已移除 context-path,统一配置
 
 # SocketIO 配置
+# 参考官方文档:https://www.socketio4j.org/server-api
+# 关键配置说明:
+# - context: 必须设置为 "/socket.io",Engine.IO v3 客户端默认连接此路径
+# - upgrade-timeout: Polling → WebSocket 升级超时时间,默认 10 秒
+# - ping-interval: 心跳间隔,建议设置为小于 NAT 超时时间(通常 30-60 秒)
+# - ping-timeout: 心跳超时,如果此时间内未收到 pong,则认为连接断开
 socketio:
   host: 0.0.0.0
   port: 9090
+  context: /socket.io  # 关键:Engine.IO v3 客户端连接路径
   max-frame-payload-length: 1048576  # 1MB
   max-http-content-length: 1048576    # 1MB
   ping-timeout: 60000                # 60秒
-  ping-interval: 25000               # 25秒
+  ping-interval: 25000               # 25秒(建议小于 NAT 超时时间)
+  upgrade-timeout: 10000             # 关键:Polling 升级到 WebSocket 的超时时间,10秒
 
 # JWT 配置
 jwt: