服务器 > 网络 > websocket

websocket (@ServerEndpoint)基本使用指南

76人参与 2024-08-01 websocket

概述

websocket 介绍


java 实现 websocket 服务端的方式

主要有两种:


@serverendpoint 注解方式

@serverendpoint 注解

介绍


@serverendpoint 注解的参数配置


websocket 端点类常用对象

在一个使用@serverendpoint注解定义的 websocket 端点类中,可以自动注入以下类型的对象:


使用注意事项


基本使用

依赖

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-websocket</artifactid>
</dependency>

核心处理器(@serverendpoint)

import com.blackcrow.common.utils.uuidutil;
import lombok.extern.slf4j.slf4j;
import org.springframework.stereotype.service;

import javax.websocket.*;
import javax.websocket.server.serverendpoint;
import java.util.map;
import java.util.concurrent.concurrenthashmap;

/**
 * websocket 服务端点。注意:websocket对象是多例的
 */
@slf4j
@service
//@serverendpoint(value = "/chat")
@serverendpoint(value = "/chat", configurator = websocketserverconfigurator.class)
public class websocketserver {
    // 用于存储每个用户客户端对象
    public static map<string, websocketserver> onlineusermap = new concurrenthashmap<>();

    // 用户id
    private string userid;
    // 会话
    private session session;

    @onopen
    public void onopen(session session, endpointconfig config){
        this.session = session;
        this.userid = uuidutil.get4uuid();
        log.info("收到来自窗口的连接,userid={}", this.userid);
        onlineusermap.put(this.userid, this);
        object aaaa = config.getuserproperties().get("aaaa");
        log.info("aaaa={}", aaaa);
    }

    @onmessage
    public void onmessage(string message, session session){
        log.info("收到来自窗口[{}]的的信息: {}", this.userid, message);
    }

    @onclose
    public void onclose(session session){
        onlineusermap.remove(this.userid);
        log.info("有一连接[{}]关闭!当前连接数为 {}", this.userid, onlineusermap.size());
    }

    @onerror
    public void onerror(session session, throwable throwable){
        log.error("websocketserver 连接发生错误", throwable);
    }

    /**
     * 給session连接推送消息
     */
    private void sendmessage(object message) {
        try {
            this.session.getbasicremote().sendobject(message);
        } catch (exception e) {
            e.printstacktrace();
            log.error("向客户端推送数据发生错误", e);
        }
    }

    /**
     * 向所有连接群发消息
     */
    public static void sendmessagetoall(object message){
        for (websocketserver item : onlineusermap.values()) {
            try {
                item.sendmessage(message);
            } catch (exception e) {
                log.error("向客户端推送数据发生错误", e);
            }
        }
    }
}

配置类(serverendpointexporter )

注:

import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.web.socket.server.standard.serverendpointexporter;

@configuration
public class websocketconfig {
    /**
     * serverendpointexporter 将会扫描所有使用 @serverendpoint 注解标记的类,并将它们注册为 websocket 服务端点
     */
    @bean
    public serverendpointexporter serverendpointexporter() {
        return new serverendpointexporter();
    }
}

握手连接处理类

import javax.websocket.handshakeresponse;
import javax.websocket.server.handshakerequest;
import javax.websocket.server.serverendpointconfig;
import java.util.list;
import java.util.map;

/**
 * 握手处理器
 */
public class websocketserverconfigurator extends serverendpointconfig.configurator {
    @override
    public void modifyhandshake(serverendpointconfig sec, handshakerequest request, handshakeresponse response) {
        // 获取客户端发送的http请求头信息
        map<string, list<string>> headers = request.getheaders();

        // 检查某个特定的请求头是否存在或是否符合要求
        list<string> customheadervalues = headers.get("custom-header");
        if (customheadervalues == null || customheadervalues.isempty() || !customheadervalues.get(0).equals("expectedvalue")) {
            // 如果请求头不符合要求,则拒绝握手
            throw new runtimeexception("custom header is missing or invalid");
        }

        // 如果请求头符合要求,则继续握手过程
        super.modifyhandshake(sec, request, response);
    }
}

springboot 集成 websocket 方式

websocket 端点类常用对象

websocketmessage

public interface websocketmessage<t> {
	/**
	 * 消息载荷
	 */
	t getpayload();
	/**
	 * 消息字节长度
	 */
	int getpayloadlength();
	/**
	 * 当org.springframework.web.socket.websockethandler#supportspartialmessages()配置允许分片消息时,
	 * 如果当前消息是客户端本次送达消息的最后一部分时,该方法返回true。如果分片消息不可用或是被禁用,放回false
	 */
	boolean islast();
}

websocketsession

常用方法:

public interface websocketsession extends closeable {

	/**
	 * 会话标识
	 */
	string getid();

	/**
	 * websocket 连接的uri
	 */
	@nullable
	uri geturi();

	/**
	 * 返回握手请求中使用的headers
	 */
	httpheaders gethandshakeheaders();

	/**
	 *返回websocke会话关联的属性。
	 *在服务端,可以使用org.springframework.web.socket.server.handshakeinterceptor填充属性
	 *在客户端,可以使用org.springframework.web.socket.client.websocketclient的握手方法填充属性
	 */
	map<string, object> getattributes();

	/**
	 * 返回一个包含已验证的用户名称的java.security.principal实例,如果用户没有验证成功返回null
	 */
	@nullable
	principal getprincipal();

	/**
	 * 返回请求接收方的地址
	 */
	@nullable
	inetsocketaddress getlocaladdress();

	/**
	 * 返回客户端的地址
	 */
	@nullable
	inetsocketaddress getremoteaddress();

	/**
	 *返回约定的子协议,如果没有协议或是协议失败返回null
	 */
	@nullable
	string getacceptedprotocol();

	/**
	 * 配置一次接收文本消息最大值
	 */
	void settextmessagesizelimit(int messagesizelimit);

	/**
	 * 获取一次接收文本消息最大值
	 */
	int gettextmessagesizelimit();

	/**
	 * 配置一次接收二进制消息最大值
	 */
	void setbinarymessagesizelimit(int messagesizelimit);

	/**
	 * 获取一次接收二进制消息最大值
	 */
	int getbinarymessagesizelimit();

	/**
	 * 获取约定的扩展
	 */
	list<websocketextension> getextensions();

	/**
	 * 发送消息,websocket会话底层协议不支持并发发送消息,因此发送必须是同步的。
	 * 保证信息发送同步进行,一种方法是使用org.springframework.web.socket.handler.concurrentwebsocketsessiondecorator
	 * 包装websocketsession
	 */
	void sendmessage(websocketmessage<?> message) throws ioexception;

	/**
	 * 底层连接是否打开
	 */
	boolean isopen();

	/**
	 * 使用状态码1000关闭websocket连接
	 */
	@override
	void close() throws ioexception;

	/**
	 * 使用指定状态码websocket连接
	 */
	void close(closestatus status) throws ioexception;
}

基本使用

依赖

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-websocket</artifactid>
</dependency>

核心处理器(websockethandler)

import org.springframework.web.socket.*;
import java.io.ioexception;
import java.util.map;
import java.util.concurrent.concurrenthashmap;

/**
 * websocket核心处理器
 */
public class mywebsockethandler implements websockethandler {
    private static final map<string, websocketsession> sessions = new concurrenthashmap<>();
    @override
    public void afterconnectionestablished(websocketsession session) throws exception {
        string username = session.getattributes().get("username").tostring();
        sessions.put(username, session);
        system.out.println(string.format("成功建立连接~ username: %s", username));
    }
    @override
    public void handlemessage(websocketsession session, websocketmessage<?> message) throws exception {
        string msg = message.getpayload().tostring();
        system.out.println(msg);
    }
    @override
    public void handletransporterror(websocketsession session, throwable exception) throws exception {
        system.out.println("连接出错");
        if (session.isopen()) {
            session.close();
        }
    }
    @override
    public void afterconnectionclosed(websocketsession session, closestatus closestatus) throws exception {
        system.out.println("连接已关闭,status:" + closestatus);
    }
    @override
    public boolean supportspartialmessages() {
        return false;
    }

    /**
     * 指定发消息
     */
    public static void sendmessage(string username, string message) {
        websocketsession websocketsession = sessions.get(username);
        if (websocketsession == null || !websocketsession.isopen()) return;
        try {
            websocketsession.sendmessage(new textmessage(message));
        } catch (ioexception e) {
            e.printstacktrace();
        }
    }
    /**
     * 群发消息
     */
    public static void fanoutmessage(string message) {
        sessions.keyset().foreach(us -> sendmessage(us, message));
    }
}

核心配置类(websocketconfigurer)

import org.springframework.context.annotation.configuration;
import org.springframework.http.server.serverhttprequest;
import org.springframework.http.server.serverhttpresponse;
import org.springframework.http.server.servletserverhttprequest;
import org.springframework.web.socket.websockethandler;
import org.springframework.web.socket.config.annotation.enablewebsocket;
import org.springframework.web.socket.config.annotation.websocketconfigurer;
import org.springframework.web.socket.config.annotation.websockethandlerregistry;
import org.springframework.web.socket.server.handshakeinterceptor;
import java.util.map;

/**
 * websocket 核心配置类
 */
@configuration
@enablewebsocket  // 开启注解接收和发送消息
public class websocketconfig implements websocketconfigurer {
    /**
     * 配置 websocket 入口,允许访问的域、注册 handler、定义拦截器等
     * 注;配置注册的处理器和拦截器是单例的,无论多少连接进来,都是用相同的对象处理。
     */
    @override
    public void registerwebsockethandlers(websockethandlerregistry registry) {
        registry.addhandler(new mywebsockethandler(), "/ws")    // 设置连接路径和处理
            .setallowedorigins("*")
            .addinterceptors(new mywebsocketinterceptor());     // 设置拦截器
    }
    
    /**
     * 自定义拦截器拦截websocket请求
     */
    class mywebsocketinterceptor implements handshakeinterceptor {
        /**
         * 握手前置拦截。一般用来注册用户信息,绑定 websocketsession
         */
        @override
        public boolean beforehandshake(serverhttprequest request, serverhttpresponse response,
                                       websockethandler wshandler, map<string, object> attributes) throws exception {
            system.out.println("握手前置拦截~~");
            if (!(request instanceof servletserverhttprequest)) return true;
//            httpservletrequest servletrequest = ((servletserverhttprequest) request).getservletrequest();
//            string username = (string) servletrequest.getsession().getattribute("username");
            string username = "koishipyb";
            attributes.put("username", username);
            object username1 = attributes.get("username");
            return true;
        }

        /**
         * 握手后置拦截
         */
        @override
        public void afterhandshake(serverhttprequest request, serverhttpresponse response,
                                   websockethandler wshandler, exception exception) {
            system.out.println("握手后置拦截~~");
        }
    }
}

nginx 配置 websocket

server{
	   # 监听的端口号
       listen      9095;
       server_name robotchat.lukeewin.top; # 这里填写的是访问的域名
       location / {
           proxy_pass http://127.0.0.1:9090; # 这里填写的是代理的路径和端口
           proxy_set_header host $host;
           proxy_set_header x-real_ip $remote_addr;
           proxy_set_header x-forwarded-for $proxy_add_x_forwarded_for;
       }
       
       # 以下配置针对websocket
       location /ws { # onlinecount为websocket的访问uri
           proxy_redirect off;
           proxy_pass http://127.0.0.1:9090;
           proxy_set_header host $host;
           proxy_set_header x-real_ip $remote_addr;
           proxy_set_header x-forwarded-for $proxy_add_x_forwarded_for;
           proxy_http_version 1.1;
           proxy_read_timeout 36000s;
           proxy_send_timeout 36000s;
           proxy_set_header upgrade $http_upgrade;   # 升级协议头 websocket
           proxy_set_header connection "upgrade";
       }
}

注:


参考

(0)
打赏 微信扫一扫 微信扫一扫

您想发表意见!!点此发布评论

推荐阅读

Django3框架-(3)-[使用websocket]:使用channels实现websocket功能;简化的配置和实际使用方式

08-03

前端如何使用WebSocket发送消息

08-03

Nginx 配置 WebSocket 代理

08-06

websocket获取实时数据的几种常见链接方式

08-06

nginx代理webSocket链接,webSocket频繁断开重连方式

09-20

Nginx配置WebSocket代理的示例代码

10-14

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论