AppSocketServer.java 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package me.zhengjie.base.websocket;
  2. import java.io.IOException;
  3. import java.util.Iterator;
  4. import java.util.concurrent.CopyOnWriteArraySet;
  5. import javax.websocket.OnClose;
  6. import javax.websocket.OnError;
  7. import javax.websocket.OnMessage;
  8. import javax.websocket.OnOpen;
  9. import javax.websocket.Session;
  10. import javax.websocket.server.PathParam;
  11. import javax.websocket.server.ServerEndpoint;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. import org.springframework.beans.factory.annotation.Autowired;
  15. import org.springframework.stereotype.Component;
  16. import me.zhengjie.security.security.TokenProvider;
  17. @ServerEndpoint("/appSocketServer/{sname}")
  18. @Component
  19. public class AppSocketServer {
  20. private static final Logger log = LoggerFactory.getLogger(AppSocketServer.class);
  21. private static int onlineCount = 0;
  22. private static CopyOnWriteArraySet<AppSocketServer> webSocketSet = new CopyOnWriteArraySet<>();
  23. private Session session;
  24. private String sname = "";
  25. @Autowired
  26. TokenProvider tokenProvider;
  27. public AppSocketServer() {
  28. // com.gangquan360.smartadmin.module.employee.EmployeeController.query dd;
  29. }
  30. @OnOpen
  31. public void onOpen(Session session, @PathParam("sname") String sname) {
  32. this.session = session;
  33. // 如果存在就先删除一个,防止重复推送消息
  34. for (AppSocketServer webSocket : webSocketSet) {
  35. if (webSocket.sname.equals(sname)) {
  36. webSocketSet.remove(webSocket);
  37. }
  38. }
  39. webSocketSet.add(this);
  40. log.info("有新窗口开始监听:" + sname + "当前人数:" + webSocketSet.size());
  41. this.sname = sname;
  42. try {
  43. sendMessage("连接成功");
  44. } catch (IOException e) {
  45. log.error("websocket IO异常:" + e.getMessage());
  46. }
  47. // this.session = session;
  48. // webSocketSet.add(this);
  49. // addOnlineCount();
  50. // log.info("有新窗口开始监听:" + sname + ",当前在线人数为" + getOnlineCount());
  51. // this.sname = sname;
  52. //
  53. // try {
  54. // this.sendMessage("连接成功");
  55. // } catch (IOException e) {
  56. // log.error("websocket IO异常");
  57. // }
  58. }
  59. @OnClose
  60. public void onClose() {
  61. webSocketSet.remove(this);
  62. log.info("有一连接关闭!当前在线人数为" + webSocketSet.size());
  63. }
  64. @OnMessage
  65. public void onMessage(String message, Session session) {
  66. String[] msgs = null;
  67. String heartbeat = message;
  68. if (message.contains(",")) {
  69. msgs = message.split(",");
  70. heartbeat = msgs[0];
  71. }
  72. log.info("收到来自窗口" + this.sname + "的信息:" + message);
  73. Iterator<AppSocketServer> iterator = webSocketSet.iterator();
  74. while (iterator.hasNext()) {
  75. AppSocketServer item = (AppSocketServer) iterator.next();
  76. try {
  77. // 判断token是否存在,不存在则删除
  78. if (item.sname.equals(this.sname)) {
  79. if (msgs != null && msgs[1] != null) {
  80. Object obj = tokenProvider.getToken(msgs[1]);
  81. if (obj == null) {
  82. item.sendMessage("close");
  83. continue;
  84. }
  85. }
  86. item.sendMessage(heartbeat);
  87. }
  88. } catch (IOException e) {
  89. e.printStackTrace();
  90. }
  91. }
  92. }
  93. @OnError
  94. public void onError(Session session, Throwable error) {
  95. log.error("发生错误");
  96. error.printStackTrace();
  97. webSocketSet.remove(this);
  98. }
  99. public void sendMessage(String message) throws IOException {
  100. this.session.getBasicRemote().sendText(message);
  101. }
  102. public static boolean sendInfo(AppSocketServer item, String message, @PathParam("sname") String sname) {
  103. log.info("推送消息到窗口" + sname + ",推送内容:" + message);
  104. try {
  105. //
  106. item.sendMessage(message);
  107. } catch (IOException ioException) {
  108. ioException.printStackTrace();
  109. }
  110. return false;
  111. }
  112. public String getSname() {
  113. return sname;
  114. }
  115. public static CopyOnWriteArraySet<AppSocketServer> getWebSocketSet() {
  116. return webSocketSet;
  117. }
  118. }