ny
23 小时以前 b6f169fe43a2b13f351aefc152374fc7f0bc8cb7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package jnpf.message.util;
 
import cn.dev33.satoken.stp.StpUtil;
import com.alibaba.fastjson.JSONObject;
import jnpf.consts.AuthConsts;
import jnpf.util.UserProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
 
import jakarta.websocket.Session;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
 
import static jnpf.consts.AuthConsts.TOKEN_PREFIX;
 
/**
 *
 * @author JNPF开发平台组
 * @version V3.1.0
 * @copyright 引迈信息技术有限公司
 * @date 2021/3/16 10:51
 */
@Slf4j
public class OnlineUserProvider {
 
    /**
     * 在线用户
     */
    private static final List<OnlineUserModel> onlineUserList = new ArrayList<>();
 
    public static List<OnlineUserModel> getOnlineUserList() {
        return OnlineUserProvider.onlineUserList;
    }
 
 
    public static void addModel(OnlineUserModel model){
        synchronized (onlineUserList) {
            OnlineUserProvider.onlineUserList.add(model);
        }
    }
 
    public static void removeModel(OnlineUserModel onlineUserModel){
        synchronized (onlineUserList) {
            onlineUserList.remove(onlineUserModel);
        }
    }
 
 
 
    // =================== Websocket相关操作 ===================
 
    /**
     * 根据Token精准推送Websocket 登出消息
     * @param token
     */
    public static void removeWebSocketByToken(String... token) {
        List<String> tokens = Arrays.stream(token).map(t -> t.contains(AuthConsts.TOKEN_PREFIX) ? t : TOKEN_PREFIX + " " + t).collect(Collectors.toList());
        //清除websocket登录状态
        List<OnlineUserModel> users = OnlineUserProvider.getOnlineUserList().stream().filter(t -> tokens.contains(t.getToken())).collect(Collectors.toList());
        if (!ObjectUtils.isEmpty(users)) {
            for (OnlineUserModel user : users) {
                OnlineUserProvider.logoutWS(user, null);
                //先移除对象, 并推送下线信息, 避免网络原因导致就用户未断开 新用户连不上WebSocket
                OnlineUserProvider.removeModel(user);
                //通知所有在线,有用户离线
                //功能已删除
//                for (OnlineUserModel item : OnlineUserProvider.getOnlineUserList().stream().filter(t -> !Objects.equals(user.getUserId(), t.getUserId()) && Objects.equals(user.getTenantId(),t.getTenantId())).collect(Collectors.toList())) {
//                    if (!item.getUserId().equals(user.getUserId())) {
//                        JSONObject obj = new JSONObject();
//                        obj.put("method", "Offline");
//                        //推送给前端
//                        OnlineUserProvider.sendMessage(item, obj);
//
//                    }
//                }
            }
        }
    }
 
    /**
     * 根据用户ID 推送全部Websocket 登出消息
     * @param userId
     */
    public static void removeWebSocketByUser(String userId) {
        List<String> tokens = StpUtil.getTokenValueListByLoginId(UserProvider.splicingLoginId(userId));
        removeWebSocketByToken(tokens.toArray(new String[tokens.size()]));
    }
 
    /**
     * 发送用户退出消息
     * @param session
     */
    public static void logoutWS(OnlineUserModel onlineUserModel, Session session) {
        JSONObject obj = new JSONObject();
        obj.put("method", "logout");
        if(onlineUserModel != null) {
            sendMessage(onlineUserModel, obj);
        }else{
            sendMessage(session, obj);
        }
    }
 
 
    /**
     * 发送关闭WebSocket消息, 前端不在重连
     * @param session
     */
    public static void closeFrontWs(OnlineUserModel onlineUserModel, Session session) {
        JSONObject obj = new JSONObject();
        obj.put("method", "closeSocket");
        if(onlineUserModel != null) {
            sendMessage(onlineUserModel, obj);
        }else{
            sendMessage(session, obj);
        }
    }
 
 
    public static void sendMessage(OnlineUserModel onlineUserModel, Object message){
        Session session = onlineUserModel.getWebSocket();
        synchronized (session) {
 
            try {
                if (session.isOpen()) {
                    session.getAsyncRemote().sendText(JSONObject.toJSONString(message));
                }else{
                    log.debug("WS未打开: {}, {}, {}, {}, {}", onlineUserModel.getTenantId(), session.getId(), onlineUserModel.getUserId(), onlineUserModel.getToken(), message);
                    try{
                        session.close();
                    }catch (Exception ee){
                    }
                    finally {
                        OnlineUserProvider.removeModel(onlineUserModel);
                    }
                }
            }catch (Exception e){
                log.debug(String.format("WS消息发送失败: %s, %s, %s, %s, %s", onlineUserModel.getTenantId(), session.getId(), onlineUserModel.getUserId(), onlineUserModel.getToken(), message), e);
            }
        }
    }
 
    public static void sendMessage(Session session, Object message){
        OnlineUserModel onlineUserModel = OnlineUserProvider.getOnlineUserList().stream().filter(t -> t.getConnectionId().equals(session.getId())).findFirst().orElse(null);
        if(onlineUserModel == null){
            onlineUserModel = new OnlineUserModel();
            onlineUserModel.setWebSocket(session);
        }
        synchronized (session) {
            sendMessage(onlineUserModel, message);
        }
    }
 
}