package me.zhengjie.base.websocket; import java.io.IOException; import java.util.Iterator; import java.util.concurrent.CopyOnWriteArraySet; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import me.zhengjie.security.security.TokenProvider; @ServerEndpoint("/appSocketServer/{sname}") @Component public class AppSocketServer { private static final Logger log = LoggerFactory.getLogger(AppSocketServer.class); private static int onlineCount = 0; private static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet<>(); private Session session; private String sname = ""; @Autowired TokenProvider tokenProvider; public AppSocketServer() { // com.gangquan360.smartadmin.module.employee.EmployeeController.query dd; } @OnOpen public void onOpen(Session session, @PathParam("sname") String sname) { this.session = session; // 如果存在就先删除一个,防止重复推送消息 for (AppSocketServer webSocket : webSocketSet) { if (webSocket.sname.equals(sname)) { webSocketSet.remove(webSocket); } } webSocketSet.add(this); log.info("有新窗口开始监听:" + sname + "当前人数:" + webSocketSet.size()); this.sname = sname; try { sendMessage("连接成功"); } catch (IOException e) { log.error("websocket IO异常:" + e.getMessage()); } // this.session = session; // webSocketSet.add(this); // addOnlineCount(); // log.info("有新窗口开始监听:" + sname + ",当前在线人数为" + getOnlineCount()); // this.sname = sname; // // try { // this.sendMessage("连接成功"); // } catch (IOException e) { // log.error("websocket IO异常"); // } } @OnClose public void onClose() { webSocketSet.remove(this); log.info("有一连接关闭!当前在线人数为" + webSocketSet.size()); } @OnMessage public void onMessage(String message, Session session) { String[] msgs = null; String heartbeat = message; if (message.contains(",")) { msgs = message.split(","); heartbeat = msgs[0]; } log.info("收到来自窗口" + this.sname + "的信息:" + message); Iterator iterator = webSocketSet.iterator(); while (iterator.hasNext()) { AppSocketServer item = (AppSocketServer) iterator.next(); try { // 判断token是否存在,不存在则删除 if (item.sname.equals(this.sname)) { if (msgs != null && msgs[1] != null) { Object obj = tokenProvider.getToken(msgs[1]); if (obj == null) { item.sendMessage("close"); continue; } } item.sendMessage(heartbeat); } } catch (IOException e) { e.printStackTrace(); } } } @OnError public void onError(Session session, Throwable error) { log.error("发生错误"); error.printStackTrace(); webSocketSet.remove(this); } public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } public static boolean sendInfo(AppSocketServer item, String message, @PathParam("sname") String sname) { log.info("推送消息到窗口" + sname + ",推送内容:" + message); try { // item.sendMessage(message); } catch (IOException ioException) { ioException.printStackTrace(); } return false; } public String getSname() { return sname; } public static CopyOnWriteArraySet getWebSocketSet() { return webSocketSet; } }