76人参与 • 2024-08-01 • websocket
websocket 是一种通信协议,通过单个 tcp 连接提供全双工通信通道。它允许客户端和服务器之间进行双向通信、实时交互,比如在线聊天、实时数据展示等。
与传统的 http 协议不同,websocket 连接是持久的,可以在服务器和客户端之间发送实时数据。
websocket的 一些关键特点:
基本工作原理:
主要有两种:
@serverendpoint 注解是 tomcat 7 中新增加的一个注解,用于标识一个类为 websocket 服务端点
websocket 服务端点监听客户端的 websocket 连接,并将连接管理起来,供客户端和服务端进行实时通信
一个标注有 @serverendpoint 的类必须包含一个无参构造函数,并且可以有一个或多个注解为 @onopen、@onclose、@onmessage、@onerror 的方法。
@onopen:当 websocket 连接建立时,会调用标注有 @onopen 的方法
@onclose:当 websocket 连接关闭时,会调用标注有 @onclose 的方法
@onmessage:当收到客户端发送的消息时,会调用标注有 @onmessage 的方法
在 @onmessage 方法中,可以通过参数文本、二进制、pongmessage 等方式来接收客户端发送的消息。
同样可以通过 session 给客户端发送消息,以实现双向通信。
@onerror:当出现错误时,会调用标注有 @onerror 的方法
注意:标注 @serverendpoint 注解的类对象是多例的,即每个连接都会创建一个新的对象
value 参数:必选参数,用于指定 websocket 服务端点的 uri 地址
**decoders **参数:数组类型,指定解码器
包含用于将 websocket 消息解码为 java 对象的解码器(decoder)的类
解码器帮助将原始消息转换为 java 对象
encoders 参数:数组类型,指定编解码器
包含用于将 java 对象编码为 websocket 消息的编码器(encoder)的类。
编码器帮助将 java 对象转换为可以发送到客户端的消息
subprotocols 参数:用于指定一个或多个 websocket 的子协议
configurator 参数:一个类,用于提供配置 websocket 端点的自定义配置器
这允许在 websocket 端点创建和配置过程中进行额外的设置
在一个使用@serverendpoint
注解定义的 websocket 端点类中,可以自动注入以下类型的对象:
javax.websocket.session:websocket 的一个会话对象,代表了 websocket 的一个客户端和服务器的连接。
可以使用 session 对象来获取 websocket 中的各种状态和信息,比如获得客户端的地址、判断客户端是否已经关闭等。
javax.websocket.endpointconfig: 用于在 websocket 端点的生命周期内共享配置信息的对象
其他 spring bean: 如果 websocket 端点类是一个 spring 管理的 bean,可以通过使用 @autowired 注解或构造函数注入其他 spring bean,以便在 websocket 端点中使用它们。
servlet api 对象: 如果 websocket 应用程序与 servlet 容器集成,可以注入 servlet api 的相关对象,例如httpservletrequest
、httpservletresponse
等,以便在 websocket 处理中访问web请求和响应
@autowired
private httpservletrequest request;
@onopen
public void onopen(session session) {
// 使用注入的httpservletrequest对象
}
其他常用对象:
remoteendpoint :websocket 的一个远程端点对象,代表了客户端和服务器的一个连接通道。
通过 remoteendpoint 对象,可以向客户端发送消息、关闭连接等。
remoteendpoint 对象获取方法:通过 session 对象的 basicremote 属性获取
remoteendpoint.basic remote = session.getbasicremote();
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-websocket</artifactid>
</dependency>
import com.blackcrow.common.utils.uuidutil;
import lombok.extern.slf4j.slf4j;
import org.springframework.stereotype.service;
import javax.websocket.*;
import javax.websocket.server.serverendpoint;
import java.util.map;
import java.util.concurrent.concurrenthashmap;
/**
* websocket 服务端点。注意:websocket对象是多例的
*/
@slf4j
@service
//@serverendpoint(value = "/chat")
@serverendpoint(value = "/chat", configurator = websocketserverconfigurator.class)
public class websocketserver {
// 用于存储每个用户客户端对象
public static map<string, websocketserver> onlineusermap = new concurrenthashmap<>();
// 用户id
private string userid;
// 会话
private session session;
@onopen
public void onopen(session session, endpointconfig config){
this.session = session;
this.userid = uuidutil.get4uuid();
log.info("收到来自窗口的连接,userid={}", this.userid);
onlineusermap.put(this.userid, this);
object aaaa = config.getuserproperties().get("aaaa");
log.info("aaaa={}", aaaa);
}
@onmessage
public void onmessage(string message, session session){
log.info("收到来自窗口[{}]的的信息: {}", this.userid, message);
}
@onclose
public void onclose(session session){
onlineusermap.remove(this.userid);
log.info("有一连接[{}]关闭!当前连接数为 {}", this.userid, onlineusermap.size());
}
@onerror
public void onerror(session session, throwable throwable){
log.error("websocketserver 连接发生错误", throwable);
}
/**
* 給session连接推送消息
*/
private void sendmessage(object message) {
try {
this.session.getbasicremote().sendobject(message);
} catch (exception e) {
e.printstacktrace();
log.error("向客户端推送数据发生错误", e);
}
}
/**
* 向所有连接群发消息
*/
public static void sendmessagetoall(object message){
for (websocketserver item : onlineusermap.values()) {
try {
item.sendmessage(message);
} catch (exception e) {
log.error("向客户端推送数据发生错误", e);
}
}
}
}
注:
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.web.socket.server.standard.serverendpointexporter;
@configuration
public class websocketconfig {
/**
* serverendpointexporter 将会扫描所有使用 @serverendpoint 注解标记的类,并将它们注册为 websocket 服务端点
*/
@bean
public serverendpointexporter serverendpointexporter() {
return new serverendpointexporter();
}
}
这个类是一个自定义的 websocket 配置器,它实现了 javax.websocket.server.serverendpointconfig.configurator 接口。它的作用是在 websocket 握手期间修改握手请求和响应。
具体来说,这个类重写了 modifyhandshake 方法,在 websocket 握手期间被调用。
modifyhandshake
方法允许在这个握手阶段进行干预,以便:
modifyhandshake
,可以根据客户端的请求或其他条件来选择要使用的子协议。modifyhandshake
可以配置跨域相关的设置,如允许的来源、凭据模式等。modifyhandshake
来添加任何自定义逻辑,这些逻辑在握手阶段需要执行。modifyhandshake
方法中添加逻辑来检查客户端的请求,并据此决定是否继续握手过程。如果决定拒绝连接,可以抛出一个异常来中断握手。注:在 websocket 的生命周期中,握手是客户端和服务器建立连接的第一步。这一步通常涉及 http 请求和响应,以便双方能够就将要建立的 websocket 连接达成协议。
import javax.websocket.handshakeresponse;
import javax.websocket.server.handshakerequest;
import javax.websocket.server.serverendpointconfig;
import java.util.list;
import java.util.map;
/**
* 握手处理器
*/
public class websocketserverconfigurator extends serverendpointconfig.configurator {
@override
public void modifyhandshake(serverendpointconfig sec, handshakerequest request, handshakeresponse response) {
// 获取客户端发送的http请求头信息
map<string, list<string>> headers = request.getheaders();
// 检查某个特定的请求头是否存在或是否符合要求
list<string> customheadervalues = headers.get("custom-header");
if (customheadervalues == null || customheadervalues.isempty() || !customheadervalues.get(0).equals("expectedvalue")) {
// 如果请求头不符合要求,则拒绝握手
throw new runtimeexception("custom header is missing or invalid");
}
// 如果请求头符合要求,则继续握手过程
super.modifyhandshake(sec, request, response);
}
}
public interface websocketmessage<t> {
/**
* 消息载荷
*/
t getpayload();
/**
* 消息字节长度
*/
int getpayloadlength();
/**
* 当org.springframework.web.socket.websockethandler#supportspartialmessages()配置允许分片消息时,
* 如果当前消息是客户端本次送达消息的最后一部分时,该方法返回true。如果分片消息不可用或是被禁用,放回false
*/
boolean islast();
}
常用方法:
void sendmessage(websocketmessage<?> message);
void close();
public interface websocketsession extends closeable {
/**
* 会话标识
*/
string getid();
/**
* websocket 连接的uri
*/
@nullable
uri geturi();
/**
* 返回握手请求中使用的headers
*/
httpheaders gethandshakeheaders();
/**
*返回websocke会话关联的属性。
*在服务端,可以使用org.springframework.web.socket.server.handshakeinterceptor填充属性
*在客户端,可以使用org.springframework.web.socket.client.websocketclient的握手方法填充属性
*/
map<string, object> getattributes();
/**
* 返回一个包含已验证的用户名称的java.security.principal实例,如果用户没有验证成功返回null
*/
@nullable
principal getprincipal();
/**
* 返回请求接收方的地址
*/
@nullable
inetsocketaddress getlocaladdress();
/**
* 返回客户端的地址
*/
@nullable
inetsocketaddress getremoteaddress();
/**
*返回约定的子协议,如果没有协议或是协议失败返回null
*/
@nullable
string getacceptedprotocol();
/**
* 配置一次接收文本消息最大值
*/
void settextmessagesizelimit(int messagesizelimit);
/**
* 获取一次接收文本消息最大值
*/
int gettextmessagesizelimit();
/**
* 配置一次接收二进制消息最大值
*/
void setbinarymessagesizelimit(int messagesizelimit);
/**
* 获取一次接收二进制消息最大值
*/
int getbinarymessagesizelimit();
/**
* 获取约定的扩展
*/
list<websocketextension> getextensions();
/**
* 发送消息,websocket会话底层协议不支持并发发送消息,因此发送必须是同步的。
* 保证信息发送同步进行,一种方法是使用org.springframework.web.socket.handler.concurrentwebsocketsessiondecorator
* 包装websocketsession
*/
void sendmessage(websocketmessage<?> message) throws ioexception;
/**
* 底层连接是否打开
*/
boolean isopen();
/**
* 使用状态码1000关闭websocket连接
*/
@override
void close() throws ioexception;
/**
* 使用指定状态码websocket连接
*/
void close(closestatus status) throws ioexception;
}
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-websocket</artifactid>
</dependency>
可以实现 websockethandler 接口,也可以继承 abstractwebsockethandler 类来创建 websockethandler 实例
websockethandler 接口提供了五个方法:
afterconnectionestablished:连接成功后调用
handlemessage:处理发送来的消息
handlemessage 方法中有一个 websocketmessage
参数,是一个接口,但一般不直接使用这个接口而是使用它的实现类,它有以下几个实现类:
但是由于 handlemessage
方法的参数是websocketmessage
接口,所以实际使用中可能需要判断一下当前来的消息具体是它的哪个子类,比如这样:
public void handlemessage(websocketsession session, websocketmessage<?> message) throws exception {
if (message instanceof textmessage) {
this.handletextmessage(session, (textmessage)message);
} else if (message instanceof binarymessage) {
this.handlebinarymessage(session, (binarymessage)message);
}
}
可以直接继承 abstractwebsockethandler
类,然后重写想要处理的消息类型,它已经封装了这些重复劳动,
handletransporterror: ws 连接出错时调用
afterconnectionclosed:连接关闭后调用
supportspartialmessages:是否支持分片消息。没什么用,返回 false 就完事了
import org.springframework.web.socket.*;
import java.io.ioexception;
import java.util.map;
import java.util.concurrent.concurrenthashmap;
/**
* websocket核心处理器
*/
public class mywebsockethandler implements websockethandler {
private static final map<string, websocketsession> sessions = new concurrenthashmap<>();
@override
public void afterconnectionestablished(websocketsession session) throws exception {
string username = session.getattributes().get("username").tostring();
sessions.put(username, session);
system.out.println(string.format("成功建立连接~ username: %s", username));
}
@override
public void handlemessage(websocketsession session, websocketmessage<?> message) throws exception {
string msg = message.getpayload().tostring();
system.out.println(msg);
}
@override
public void handletransporterror(websocketsession session, throwable exception) throws exception {
system.out.println("连接出错");
if (session.isopen()) {
session.close();
}
}
@override
public void afterconnectionclosed(websocketsession session, closestatus closestatus) throws exception {
system.out.println("连接已关闭,status:" + closestatus);
}
@override
public boolean supportspartialmessages() {
return false;
}
/**
* 指定发消息
*/
public static void sendmessage(string username, string message) {
websocketsession websocketsession = sessions.get(username);
if (websocketsession == null || !websocketsession.isopen()) return;
try {
websocketsession.sendmessage(new textmessage(message));
} catch (ioexception e) {
e.printstacktrace();
}
}
/**
* 群发消息
*/
public static void fanoutmessage(string message) {
sessions.keyset().foreach(us -> sendmessage(us, message));
}
}
可以配置 websocket 入口,允许访问的域、注册 handler、定义拦截器等
注意 websockethandlerregistry .addhandler(websockethandler websockethandler, string… paths) 的第二个参数是一个字符串类型的参数列表,说明可以为多个端点指定同样配置的 websockethandler 处理
websockethandlerregistry.addhandler() 方法注册 websockethandler 之后会返回 websockethandlerregistration 用于配置 websockethandler
public interface websockethandlerregistration {
// 继续添加消息处理器
websockethandlerregistration addhandler(websockethandler handler, string... paths);
// 添加握手处理器,处理握手事件
websockethandlerregistration sethandshakehandler(handshakehandler handshakehandler);
// 添加握手拦截器,可以在处理握手前和握手后处理一些业务逻辑
websockethandlerregistration addinterceptors(handshakeinterceptor... interceptors);
// 配置允许的浏览器跨源请求类型
websockethandlerregistration setallowedorigins(string... origins);
// 配置允许的浏览器跨源请求类型
websockethandlerregistration setallowedoriginpatterns(string... originpatterns);
// 允许使用sockjs应急选项
sockjsserviceregistration withsockjs();
}
import org.springframework.context.annotation.configuration;
import org.springframework.http.server.serverhttprequest;
import org.springframework.http.server.serverhttpresponse;
import org.springframework.http.server.servletserverhttprequest;
import org.springframework.web.socket.websockethandler;
import org.springframework.web.socket.config.annotation.enablewebsocket;
import org.springframework.web.socket.config.annotation.websocketconfigurer;
import org.springframework.web.socket.config.annotation.websockethandlerregistry;
import org.springframework.web.socket.server.handshakeinterceptor;
import java.util.map;
/**
* websocket 核心配置类
*/
@configuration
@enablewebsocket // 开启注解接收和发送消息
public class websocketconfig implements websocketconfigurer {
/**
* 配置 websocket 入口,允许访问的域、注册 handler、定义拦截器等
* 注;配置注册的处理器和拦截器是单例的,无论多少连接进来,都是用相同的对象处理。
*/
@override
public void registerwebsockethandlers(websockethandlerregistry registry) {
registry.addhandler(new mywebsockethandler(), "/ws") // 设置连接路径和处理
.setallowedorigins("*")
.addinterceptors(new mywebsocketinterceptor()); // 设置拦截器
}
/**
* 自定义拦截器拦截websocket请求
*/
class mywebsocketinterceptor implements handshakeinterceptor {
/**
* 握手前置拦截。一般用来注册用户信息,绑定 websocketsession
*/
@override
public boolean beforehandshake(serverhttprequest request, serverhttpresponse response,
websockethandler wshandler, map<string, object> attributes) throws exception {
system.out.println("握手前置拦截~~");
if (!(request instanceof servletserverhttprequest)) return true;
// httpservletrequest servletrequest = ((servletserverhttprequest) request).getservletrequest();
// string username = (string) servletrequest.getsession().getattribute("username");
string username = "koishipyb";
attributes.put("username", username);
object username1 = attributes.get("username");
return true;
}
/**
* 握手后置拦截
*/
@override
public void afterhandshake(serverhttprequest request, serverhttpresponse response,
websockethandler wshandler, exception exception) {
system.out.println("握手后置拦截~~");
}
}
}
server{
# 监听的端口号
listen 9095;
server_name robotchat.lukeewin.top; # 这里填写的是访问的域名
location / {
proxy_pass http://127.0.0.1:9090; # 这里填写的是代理的路径和端口
proxy_set_header host $host;
proxy_set_header x-real_ip $remote_addr;
proxy_set_header x-forwarded-for $proxy_add_x_forwarded_for;
}
# 以下配置针对websocket
location /ws { # onlinecount为websocket的访问uri
proxy_redirect off;
proxy_pass http://127.0.0.1:9090;
proxy_set_header host $host;
proxy_set_header x-real_ip $remote_addr;
proxy_set_header x-forwarded-for $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_read_timeout 36000s;
proxy_send_timeout 36000s;
proxy_set_header upgrade $http_upgrade; # 升级协议头 websocket
proxy_set_header connection "upgrade";
}
}
注:
添加如下三行语句,才能在后台中拿到真实的 ip 地址
proxy_set_header host $host;
proxy_set_header x-real_ip $remote_addr;
proxy_set_header x-forwarded-for $proxy_add_x_forwarded_for;
获取真实 ip 地址代码如下:
public string getrealip(httpservletrequest request) {
string ip = request.getheader("x-forwarded-for");
if (stringutils.isnotempty(ip) && !"unknown".equalsignorecase(ip)) {
int index = ip.indexof(",");
if (index != -1) {
return ip.substring(0, index);
} else {
return ip;
}
}
ip = request.getheader("x-real-ip");
if (stringutils.isnotempty(ip) && !"unknown".equalsignorecase(ip)) {
return ip;
}
return request.getremoteaddr();
}
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论