Micronaut WebSocket 支持

2023-03-07 14:57 更新

Micronaut 專門支持創(chuàng)建 WebSocket 客戶端和服務(wù)器。 io.micronaut.websocket.annotation 包包含用于定義客戶端和服務(wù)器的注釋。

使用@ServerWebSocket

@ServerWebSocket 注釋可以應(yīng)用于應(yīng)映射到 WebSocket URI 的任何類。以下示例是一個簡單的聊天 WebSocket 實現(xiàn):

WebSocket 聊天示例

 Java Groovy  Kotlin 
import io.micronaut.websocket.WebSocketBroadcaster;
import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.annotation.OnClose;
import io.micronaut.websocket.annotation.OnMessage;
import io.micronaut.websocket.annotation.OnOpen;
import io.micronaut.websocket.annotation.ServerWebSocket;

import java.util.function.Predicate;

@ServerWebSocket("/chat/{topic}/{username}") // (1)
public class ChatServerWebSocket {

    private final WebSocketBroadcaster broadcaster;

    public ChatServerWebSocket(WebSocketBroadcaster broadcaster) {
        this.broadcaster = broadcaster;
    }

    @OnOpen // (2)
    public void onOpen(String topic, String username, WebSocketSession session) {
        String msg = "[" + username + "] Joined!";
        broadcaster.broadcastSync(msg, isValid(topic, session));
    }

    @OnMessage // (3)
    public void onMessage(String topic, String username,
                          String message, WebSocketSession session) {
        String msg = "[" + username + "] " + message;
        broadcaster.broadcastSync(msg, isValid(topic, session)); // (4)
    }

    @OnClose // (5)
    public void onClose(String topic, String username, WebSocketSession session) {
        String msg = "[" + username + "] Disconnected!";
        broadcaster.broadcastSync(msg, isValid(topic, session));
    }

    private Predicate<WebSocketSession> isValid(String topic, WebSocketSession session) {
        return s -> s != session &&
                topic.equalsIgnoreCase(s.getUriVariables().get("topic", String.class, null));
    }
}
import io.micronaut.websocket.WebSocketBroadcaster
import io.micronaut.websocket.WebSocketSession
import io.micronaut.websocket.annotation.OnClose
import io.micronaut.websocket.annotation.OnMessage
import io.micronaut.websocket.annotation.OnOpen
import io.micronaut.websocket.annotation.ServerWebSocket

import java.util.function.Predicate

@ServerWebSocket("/chat/{topic}/{username}") // (1)
class ChatServerWebSocket {

    private final WebSocketBroadcaster broadcaster

    ChatServerWebSocket(WebSocketBroadcaster broadcaster) {
        this.broadcaster = broadcaster
    }

    @OnOpen // (2)
    void onOpen(String topic, String username, WebSocketSession session) {
        String msg = "[$username] Joined!"
        broadcaster.broadcastSync(msg, isValid(topic, session))
    }

    @OnMessage // (3)
    void onMessage(String topic, String username,
                   String message, WebSocketSession session) {
        String msg = "[$username] $message"
        broadcaster.broadcastSync(msg, isValid(topic, session)) // (4)
    }

    @OnClose // (5)
    void onClose(String topic, String username, WebSocketSession session) {
        String msg = "[$username] Disconnected!"
        broadcaster.broadcastSync(msg, isValid(topic, session))
    }

    private Predicate<WebSocketSession> isValid(String topic, WebSocketSession session) {
        return { s -> s != session && topic.equalsIgnoreCase(s.uriVariables.get("topic", String, null)) }
    }
}
import io.micronaut.websocket.WebSocketBroadcaster
import io.micronaut.websocket.WebSocketSession
import io.micronaut.websocket.annotation.OnClose
import io.micronaut.websocket.annotation.OnMessage
import io.micronaut.websocket.annotation.OnOpen
import io.micronaut.websocket.annotation.ServerWebSocket

import java.util.function.Predicate

@ServerWebSocket("/chat/{topic}/{username}") // (1)
class ChatServerWebSocket(private val broadcaster: WebSocketBroadcaster) {

    @OnOpen // (2)
    fun onOpen(topic: String, username: String, session: WebSocketSession) {
        val msg = "[$username] Joined!"
        broadcaster.broadcastSync(msg, isValid(topic, session))
    }

    @OnMessage // (3)
    fun onMessage(topic: String, username: String,
                  message: String, session: WebSocketSession) {
        val msg = "[$username] $message"
        broadcaster.broadcastSync(msg, isValid(topic, session)) // (4)
    }

    @OnClose // (5)
    fun onClose(topic: String, username: String, session: WebSocketSession) {
        val msg = "[$username] Disconnected!"
        broadcaster.broadcastSync(msg, isValid(topic, session))
    }

    private fun isValid(topic: String, session: WebSocketSession): Predicate<WebSocketSession> {
        return Predicate<WebSocketSession> {
            (it !== session && topic.equals(it.uriVariables.get("topic", String::class.java, null), ignoreCase = true))
        }
    }
}
  1. @ServerWebSocket 注釋定義了 WebSocket 映射的路徑。 URI 可以是 URI 模板。

  2. @OnOpen 注釋聲明了在打開 WebSocket 時要調(diào)用的方法。

  3. @OnMessage 注釋聲明了在收到消息時要調(diào)用的方法。

  4. 您可以使用 WebSocketBroadcaster 向每個 WebSocket 會話廣播消息。您可以使用謂詞過濾要發(fā)送到的會話。此外,您可以使用 WebSocketSession 實例通過 WebSocketSession::send 向它發(fā)送消息。

  5. @OnClose 注釋聲明了在關(guān)閉 WebSocket 時調(diào)用的方法。

可以在 Micronaut 指南中找到 WebSockets 的實際工作示例。

對于綁定,每個 WebSocket 方法的方法參數(shù)可以是:

  • 來自 URI 模板的變量(在上面的示例中,主題和用戶名是 URI 模板變量)

  • WebSocketSession 的實例

@OnClose 方法

@OnClose 方法可以選擇接收 CloseReason。 @OnClose 方法在會話關(guān)閉之前被調(diào)用。

@OnMessage 方法

@OnMessage 方法可以為消息體定義一個參數(shù)。該參數(shù)可以是以下之一:

  • 一個 Netty WebSocketFrame

  • 任何 Java 基本類型或簡單類型(例如 String)。事實上,可以從 ByteBuf 轉(zhuǎn)換的任何類型(您可以注冊額外的 TypeConverter beans 以支持自定義類型)。

  • byte[]、ByteBuf 或 Java NIO ByteBuffer。

  • 一個POJO。在這種情況下,默認(rèn)情況下將使用 JsonMediaTypeCodec 將其解碼為 JSON。您可以注冊自定義編解碼器并使用 @Consumes 注釋定義處理程序的內(nèi)容類型。

  • WebSocketPongMessage。這是一種特殊情況:該方法不會接收常規(guī)消息,而是處理作為對發(fā)送給客戶端的 ping 的答復(fù)到達(dá)的 WebSocket pong。

@OnError 方法

可以添加一個用@OnError 注解的方法來實現(xiàn)自定義錯誤處理。 @OnError 方法可以定義一個參數(shù),接收要處理的異常類型。如果不存在@OnError 處理并且發(fā)生不可恢復(fù)的異常,WebSocket 將自動關(guān)閉。

非阻塞消息處理

前面的示例使用 WebSocketBroadcaster 接口的 broadcastSync 方法,該方法會阻塞直到廣播完成。 WebSocketSession 中存在類似的 sendSync 方法,以阻塞方式向單個接收者發(fā)送消息。但是,您可以通過從每個 WebSocket 處理程序方法返回 Publisher 或 Future 來實現(xiàn)非阻塞 WebSocket 服務(wù)器。例如:

WebSocket 聊天示例

 Java Groovy  Kotlin 
@OnMessage
public Publisher<Message> onMessage(String topic, String username,
                                    Message message, WebSocketSession session) {
    String text = "[" + username + "] " + message.getText();
    Message newMessage = new Message(text);
    return broadcaster.broadcast(newMessage, isValid(topic, session));
}
@OnMessage
Publisher<Message> onMessage(String topic, String username,
                             Message message, WebSocketSession session) {
    String text = "[$username] $message.text"
    Message newMessage = new Message(text)
    broadcaster.broadcast(newMessage, isValid(topic, session))
}
@OnMessage
fun onMessage(topic: String, username: String,
              message: Message, session: WebSocketSession): Publisher<Message> {
    val text = "[" + username + "] " + message.text
    val newMessage = Message(text)
    return broadcaster.broadcast(newMessage, isValid(topic, session))
}

上面的示例使用了廣播,它創(chuàng)建了 Publisher 的實例并將值返回給 Micronaut。 Micronaut 基于 Publisher 接口異步發(fā)送消息。類似的 send 方法通過 Micronaut 返回值異步發(fā)送單個消息。

要在 Micronaut 注釋的處理程序方法之外異步發(fā)送消息,您可以在它們各自的 WebSocketBroadcaster 和 WebSocketSession 接口中使用 broadcastAsync 和 sendAsync 方法。對于阻塞發(fā)送,可以使用 broadcastSync 和 sendSync 方法。

@ServerWebSocket 和作用域

默認(rèn)情況下,@ServerWebSocket 實例為所有 WebSocket 連接共享。必須特別注意同步本地狀態(tài)以避免線程安全問題。

如果您希望每個連接都有一個實例,請使用@Prototype 注釋該類。這使您可以從 @OnOpen 處理程序檢索 WebSocketSession 并將其分配給 @ServerWebSocket 實例的字段。

與 HTTP 會話共享會話

WebSocketSession 默認(rèn)由內(nèi)存映射支持。如果添加會話模塊,則可以在 HTTP 服務(wù)器和 WebSocket 服務(wù)器之間共享會話。

當(dāng)會話由 Redis 等持久存儲支持時,在處理每條消息后,會話將更新到后備存儲。

使用 CLI

如果您使用應(yīng)用程序類型 Micronaut 應(yīng)用程序創(chuàng)建項目,則可以將 create-websocket-server 命令與 Micronaut CLI 結(jié)合使用來創(chuàng)建一個使用 ServerWebSocket 注釋的類。

$ mn create-websocket-server MyChat
| Rendered template WebsocketServer.java to destination src/main/java/example/MyChatServer.java

連接超時

默認(rèn)情況下,Micronaut 會在五分鐘后將沒有活動的空閑連接超時。通常這不是問題,因為瀏覽器會自動重新連接 WebSocket 會話,但是您可以通過設(shè)置 micronaut.server.idle-timeout 設(shè)置來控制此行為(負(fù)值不會導(dǎo)致超時):

設(shè)置服務(wù)器的連接超時

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.server.idle-timeout=30m
micronaut:
  server:
    idle-timeout: 30m
[micronaut]
  [micronaut.server]
    idle-timeout="30m"
micronaut {
  server {
    idleTimeout = "30m"
  }
}
{
  micronaut {
    server {
      idle-timeout = "30m"
    }
  }
}
{
  "micronaut": {
    "server": {
      "idle-timeout": "30m"
    }
  }
}

如果您使用 Micronaut 的 WebSocket 客戶端,您可能還希望在客戶端上設(shè)置超時:

為客戶端設(shè)置連接超時

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.http.client.read-idle-timeout=30m
micronaut:
  http:
    client:
      read-idle-timeout: 30m
[micronaut]
  [micronaut.http]
    [micronaut.http.client]
      read-idle-timeout="30m"
micronaut {
  http {
    client {
      readIdleTimeout = "30m"
    }
  }
}
{
  micronaut {
    http {
      client {
        read-idle-timeout = "30m"
      }
    }
  }
}
{
  "micronaut": {
    "http": {
      "client": {
        "read-idle-timeout": "30m"
      }
    }
  }
}

使用@ClientWebSocket

@ClientWebSocket 注釋可以與 WebSocketClient 接口一起使用來定義 WebSocket 客戶端。

您可以使用 @Client 注釋注入對 WebSocketClient 的引用:

@Inject
@Client("http://localhost:8080")
WebSocketClient webSocketClient;

這使您可以為 WebSocket 客戶端使用相同的服務(wù)發(fā)現(xiàn)和負(fù)載平衡功能。

一旦您獲得了對 WebSocketClient 接口的引用,您就可以使用 connect 方法來獲取使用 @ClientWebSocket 注釋的 bean 的連接實例。

例如考慮以下實現(xiàn):

WebSocket 聊天示例

 Java Groovy  Kotlin 
import io.micronaut.http.HttpRequest;
import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.annotation.ClientWebSocket;
import io.micronaut.websocket.annotation.OnMessage;
import io.micronaut.websocket.annotation.OnOpen;
import org.reactivestreams.Publisher;
import io.micronaut.core.async.annotation.SingleResult;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;

@ClientWebSocket("/chat/{topic}/{username}") // (1)
public abstract class ChatClientWebSocket implements AutoCloseable { // (2)

    private WebSocketSession session;
    private HttpRequest request;
    private String topic;
    private String username;
    private Collection<String> replies = new ConcurrentLinkedQueue<>();

    @OnOpen
    public void onOpen(String topic, String username,
                       WebSocketSession session, HttpRequest request) { // (3)
        this.topic = topic;
        this.username = username;
        this.session = session;
        this.request = request;
    }

    public String getTopic() {
        return topic;
    }

    public String getUsername() {
        return username;
    }

    public Collection<String> getReplies() {
        return replies;
    }

    public WebSocketSession getSession() {
        return session;
    }

    public HttpRequest getRequest() {
        return request;
    }

    @OnMessage
    public void onMessage(String message) {
        replies.add(message); // (4)
    }
import io.micronaut.http.HttpRequest
import io.micronaut.websocket.WebSocketSession
import io.micronaut.websocket.annotation.ClientWebSocket
import io.micronaut.websocket.annotation.OnMessage
import io.micronaut.websocket.annotation.OnOpen
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Future
import io.micronaut.core.async.annotation.SingleResult

@ClientWebSocket("/chat/{topic}/{username}") // (1)
abstract class ChatClientWebSocket implements AutoCloseable { // (2)

    private WebSocketSession session
    private HttpRequest request
    private String topic
    private String username
    private Collection<String> replies = new ConcurrentLinkedQueue<>()

    @OnOpen
    void onOpen(String topic, String username,
                WebSocketSession session, HttpRequest request) { // (3)
        this.topic = topic
        this.username = username
        this.session = session
        this.request = request
    }

    String getTopic() {
        topic
    }

    String getUsername() {
        username
    }

    Collection<String> getReplies() {
        replies
    }

    WebSocketSession getSession() {
        session
    }

    HttpRequest getRequest() {
        request
    }

    @OnMessage
    void onMessage(String message) {
        replies << message // (4)
    }
import io.micronaut.http.HttpRequest
import io.micronaut.websocket.WebSocketSession
import io.micronaut.websocket.annotation.ClientWebSocket
import io.micronaut.websocket.annotation.OnMessage
import io.micronaut.websocket.annotation.OnOpen
import reactor.core.publisher.Mono
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Future

@ClientWebSocket("/chat/{topic}/{username}") // (1)
abstract class ChatClientWebSocket : AutoCloseable { // (2)

    var session: WebSocketSession? = null
        private set
    var request: HttpRequest<*>? = null
        private set
    var topic: String? = null
        private set
    var username: String? = null
        private set
    private val replies = ConcurrentLinkedQueue<String>()

    @OnOpen
    fun onOpen(topic: String, username: String,
               session: WebSocketSession, request: HttpRequest<*>) { // (3)
        this.topic = topic
        this.username = username
        this.session = session
        this.request = request
    }

    fun getReplies(): Collection<String> {
        return replies
    }

    @OnMessage
    fun onMessage(message: String) {
        replies.add(message) // (4)
    }
  1. 該類是抽象的(稍后會詳細(xì)介紹)并使用 @ClientWebSocket 進(jìn)行注釋

  2. 客戶端必須實現(xiàn) AutoCloseable 并且您應(yīng)該確保連接在某個時候關(guān)閉。

  3. 您可以使用與服務(wù)器上相同的注釋,在本例中為@OnOpen 以獲取對底層會話的引用。

  4. @OnMessage 注釋定義了從服務(wù)器接收響應(yīng)的方法。

您還可以定義以發(fā)送或廣播開頭的抽象方法,這些方法將在編譯時為您實現(xiàn)。例如:

WebSocket 發(fā)送方法

public abstract void send(String message);

請注意,通過返回 void 這會告訴 Micronaut 該方法是阻塞發(fā)送。您可以改為定義返回期貨或發(fā)布者的方法:

WebSocket 發(fā)送方法

public abstract reactor.core.publisher.Mono<String> send(String message);

上面的示例定義了一個返回 Mono 的發(fā)送方法。

WebSocket 發(fā)送方法

public abstract java.util.concurrent.Future<String> sendAsync(String message);

上面的示例定義了一個異步執(zhí)行并返回 Future 以訪問結(jié)果的發(fā)送方法。

一旦定義了客戶端類,就可以連接到客戶端套接字并開始發(fā)送消息:

連接客戶端 WebSocket

ChatClientWebSocket chatClient = webSocketClient
    .connect(ChatClientWebSocket.class, "/chat/football/fred")
    .blockFirst();
chatClient.send("Hello World!");

出于說明目的,我們使用 blockFirst() 來獲取客戶端。然而,可以結(jié)合連接(它返回一個 Flux)來通過 WebSocket 執(zhí)行非阻塞交互。

使用 CLI

如果您使用 Micronaut CLI 和默認(rèn)(服務(wù))配置文件創(chuàng)建項目,則可以使用 create-websocket-client 命令創(chuàng)建一個帶有 WebSocketClient 的抽象類。

$ mn create-websocket-client MyChat
| Rendered template WebsocketClient.java to destination src/main/java/example/MyChatClient.java


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號