Micronaut 專門支持創(chuàng)建 WebSocket 客戶端和服務(wù)器。 io.micronaut.websocket.annotation 包包含用于定義客戶端和服務(wù)器的注釋。
使用@ServerWebSocket
@ServerWebSocket 注釋可以應(yīng)用于應(yīng)映射到 WebSocket URI 的任何類。以下示例是一個簡單的聊天 WebSocket 實現(xiàn):
WebSocket 聊天示例
Java |
Groovy |
Kotlin |
import io.micronaut.websocket.WebSocketBroadcaster;
import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.annotation.OnClose;
import io.micronaut.websocket.annotation.OnMessage;
import io.micronaut.websocket.annotation.OnOpen;
import io.micronaut.websocket.annotation.ServerWebSocket;
import java.util.function.Predicate;
@ServerWebSocket("/chat/{topic}/{username}") // (1)
public class ChatServerWebSocket {
private final WebSocketBroadcaster broadcaster;
public ChatServerWebSocket(WebSocketBroadcaster broadcaster) {
this.broadcaster = broadcaster;
}
@OnOpen // (2)
public void onOpen(String topic, String username, WebSocketSession session) {
String msg = "[" + username + "] Joined!";
broadcaster.broadcastSync(msg, isValid(topic, session));
}
@OnMessage // (3)
public void onMessage(String topic, String username,
String message, WebSocketSession session) {
String msg = "[" + username + "] " + message;
broadcaster.broadcastSync(msg, isValid(topic, session)); // (4)
}
@OnClose // (5)
public void onClose(String topic, String username, WebSocketSession session) {
String msg = "[" + username + "] Disconnected!";
broadcaster.broadcastSync(msg, isValid(topic, session));
}
private Predicate<WebSocketSession> isValid(String topic, WebSocketSession session) {
return s -> s != session &&
topic.equalsIgnoreCase(s.getUriVariables().get("topic", String.class, null));
}
}
|
import io.micronaut.websocket.WebSocketBroadcaster
import io.micronaut.websocket.WebSocketSession
import io.micronaut.websocket.annotation.OnClose
import io.micronaut.websocket.annotation.OnMessage
import io.micronaut.websocket.annotation.OnOpen
import io.micronaut.websocket.annotation.ServerWebSocket
import java.util.function.Predicate
@ServerWebSocket("/chat/{topic}/{username}") // (1)
class ChatServerWebSocket {
private final WebSocketBroadcaster broadcaster
ChatServerWebSocket(WebSocketBroadcaster broadcaster) {
this.broadcaster = broadcaster
}
@OnOpen // (2)
void onOpen(String topic, String username, WebSocketSession session) {
String msg = "[$username] Joined!"
broadcaster.broadcastSync(msg, isValid(topic, session))
}
@OnMessage // (3)
void onMessage(String topic, String username,
String message, WebSocketSession session) {
String msg = "[$username] $message"
broadcaster.broadcastSync(msg, isValid(topic, session)) // (4)
}
@OnClose // (5)
void onClose(String topic, String username, WebSocketSession session) {
String msg = "[$username] Disconnected!"
broadcaster.broadcastSync(msg, isValid(topic, session))
}
private Predicate<WebSocketSession> isValid(String topic, WebSocketSession session) {
return { s -> s != session && topic.equalsIgnoreCase(s.uriVariables.get("topic", String, null)) }
}
}
|
import io.micronaut.websocket.WebSocketBroadcaster
import io.micronaut.websocket.WebSocketSession
import io.micronaut.websocket.annotation.OnClose
import io.micronaut.websocket.annotation.OnMessage
import io.micronaut.websocket.annotation.OnOpen
import io.micronaut.websocket.annotation.ServerWebSocket
import java.util.function.Predicate
@ServerWebSocket("/chat/{topic}/{username}") // (1)
class ChatServerWebSocket(private val broadcaster: WebSocketBroadcaster) {
@OnOpen // (2)
fun onOpen(topic: String, username: String, session: WebSocketSession) {
val msg = "[$username] Joined!"
broadcaster.broadcastSync(msg, isValid(topic, session))
}
@OnMessage // (3)
fun onMessage(topic: String, username: String,
message: String, session: WebSocketSession) {
val msg = "[$username] $message"
broadcaster.broadcastSync(msg, isValid(topic, session)) // (4)
}
@OnClose // (5)
fun onClose(topic: String, username: String, session: WebSocketSession) {
val msg = "[$username] Disconnected!"
broadcaster.broadcastSync(msg, isValid(topic, session))
}
private fun isValid(topic: String, session: WebSocketSession): Predicate<WebSocketSession> {
return Predicate<WebSocketSession> {
(it !== session && topic.equals(it.uriVariables.get("topic", String::class.java, null), ignoreCase = true))
}
}
}
|
@ServerWebSocket 注釋定義了 WebSocket 映射的路徑。 URI 可以是 URI 模板。
@OnOpen 注釋聲明了在打開 WebSocket 時要調(diào)用的方法。
@OnMessage 注釋聲明了在收到消息時要調(diào)用的方法。
您可以使用 WebSocketBroadcaster 向每個 WebSocket 會話廣播消息。您可以使用謂詞過濾要發(fā)送到的會話。此外,您可以使用 WebSocketSession 實例通過 WebSocketSession::send 向它發(fā)送消息。
@OnClose 注釋聲明了在關(guān)閉 WebSocket 時調(diào)用的方法。
可以在 Micronaut 指南中找到 WebSockets 的實際工作示例。
對于綁定,每個 WebSocket 方法的方法參數(shù)可以是:
@OnClose 方法
@OnClose 方法可以選擇接收 CloseReason。 @OnClose 方法在會話關(guān)閉之前被調(diào)用。
@OnMessage 方法
@OnMessage 方法可以為消息體定義一個參數(shù)。該參數(shù)可以是以下之一:
一個 Netty WebSocketFrame
任何 Java 基本類型或簡單類型(例如 String)。事實上,可以從 ByteBuf 轉(zhuǎn)換的任何類型(您可以注冊額外的 TypeConverter beans 以支持自定義類型)。
byte[]、ByteBuf 或 Java NIO ByteBuffer。
一個POJO。在這種情況下,默認(rèn)情況下將使用 JsonMediaTypeCodec 將其解碼為 JSON。您可以注冊自定義編解碼器并使用 @Consumes 注釋定義處理程序的內(nèi)容類型。
WebSocketPongMessage。這是一種特殊情況:該方法不會接收常規(guī)消息,而是處理作為對發(fā)送給客戶端的 ping 的答復(fù)到達(dá)的 WebSocket pong。
@OnError 方法
可以添加一個用@OnError 注解的方法來實現(xiàn)自定義錯誤處理。 @OnError 方法可以定義一個參數(shù),接收要處理的異常類型。如果不存在@OnError 處理并且發(fā)生不可恢復(fù)的異常,WebSocket 將自動關(guān)閉。
非阻塞消息處理
前面的示例使用 WebSocketBroadcaster 接口的 broadcastSync 方法,該方法會阻塞直到廣播完成。 WebSocketSession 中存在類似的 sendSync 方法,以阻塞方式向單個接收者發(fā)送消息。但是,您可以通過從每個 WebSocket 處理程序方法返回 Publisher 或 Future 來實現(xiàn)非阻塞 WebSocket 服務(wù)器。例如:
WebSocket 聊天示例
Java |
Groovy |
Kotlin |
@OnMessage
public Publisher<Message> onMessage(String topic, String username,
Message message, WebSocketSession session) {
String text = "[" + username + "] " + message.getText();
Message newMessage = new Message(text);
return broadcaster.broadcast(newMessage, isValid(topic, session));
}
|
@OnMessage
Publisher<Message> onMessage(String topic, String username,
Message message, WebSocketSession session) {
String text = "[$username] $message.text"
Message newMessage = new Message(text)
broadcaster.broadcast(newMessage, isValid(topic, session))
}
|
@OnMessage
fun onMessage(topic: String, username: String,
message: Message, session: WebSocketSession): Publisher<Message> {
val text = "[" + username + "] " + message.text
val newMessage = Message(text)
return broadcaster.broadcast(newMessage, isValid(topic, session))
}
|
上面的示例使用了廣播,它創(chuàng)建了 Publisher 的實例并將值返回給 Micronaut。 Micronaut 基于 Publisher 接口異步發(fā)送消息。類似的 send 方法通過 Micronaut 返回值異步發(fā)送單個消息。
要在 Micronaut 注釋的處理程序方法之外異步發(fā)送消息,您可以在它們各自的 WebSocketBroadcaster 和 WebSocketSession 接口中使用 broadcastAsync 和 sendAsync 方法。對于阻塞發(fā)送,可以使用 broadcastSync 和 sendSync 方法。
@ServerWebSocket 和作用域
默認(rèn)情況下,@ServerWebSocket 實例為所有 WebSocket 連接共享。必須特別注意同步本地狀態(tài)以避免線程安全問題。
如果您希望每個連接都有一個實例,請使用@Prototype 注釋該類。這使您可以從 @OnOpen 處理程序檢索 WebSocketSession 并將其分配給 @ServerWebSocket 實例的字段。
與 HTTP 會話共享會話
WebSocketSession 默認(rèn)由內(nèi)存映射支持。如果添加會話模塊,則可以在 HTTP 服務(wù)器和 WebSocket 服務(wù)器之間共享會話。
當(dāng)會話由 Redis 等持久存儲支持時,在處理每條消息后,會話將更新到后備存儲。
使用 CLI
如果您使用應(yīng)用程序類型 Micronaut 應(yīng)用程序創(chuàng)建項目,則可以將 create-websocket-server 命令與 Micronaut CLI 結(jié)合使用來創(chuàng)建一個使用 ServerWebSocket 注釋的類。
$ mn create-websocket-server MyChat
| Rendered template WebsocketServer.java to destination src/main/java/example/MyChatServer.java
連接超時
默認(rèn)情況下,Micronaut 會在五分鐘后將沒有活動的空閑連接超時。通常這不是問題,因為瀏覽器會自動重新連接 WebSocket 會話,但是您可以通過設(shè)置 micronaut.server.idle-timeout 設(shè)置來控制此行為(負(fù)值不會導(dǎo)致超時):
設(shè)置服務(wù)器的連接超時
Properties |
Yaml |
Toml |
Groovy |
Hocon |
JSON |
micronaut.server.idle-timeout=30m
|
micronaut:
server:
idle-timeout: 30m
|
[micronaut]
[micronaut.server]
idle-timeout="30m"
|
micronaut {
server {
idleTimeout = "30m"
}
}
|
{
micronaut {
server {
idle-timeout = "30m"
}
}
}
|
{
"micronaut": {
"server": {
"idle-timeout": "30m"
}
}
}
|
如果您使用 Micronaut 的 WebSocket 客戶端,您可能還希望在客戶端上設(shè)置超時:
為客戶端設(shè)置連接超時
Properties |
Yaml |
Toml |
Groovy |
Hocon |
JSON |
micronaut.http.client.read-idle-timeout=30m
|
micronaut:
http:
client:
read-idle-timeout: 30m
|
[micronaut]
[micronaut.http]
[micronaut.http.client]
read-idle-timeout="30m"
|
micronaut {
http {
client {
readIdleTimeout = "30m"
}
}
}
|
{
micronaut {
http {
client {
read-idle-timeout = "30m"
}
}
}
}
|
{
"micronaut": {
"http": {
"client": {
"read-idle-timeout": "30m"
}
}
}
}
|
使用@ClientWebSocket
@ClientWebSocket 注釋可以與 WebSocketClient 接口一起使用來定義 WebSocket 客戶端。
您可以使用 @Client 注釋注入對 WebSocketClient 的引用:
@Inject
@Client("http://localhost:8080")
WebSocketClient webSocketClient;
這使您可以為 WebSocket 客戶端使用相同的服務(wù)發(fā)現(xiàn)和負(fù)載平衡功能。
一旦您獲得了對 WebSocketClient 接口的引用,您就可以使用 connect 方法來獲取使用 @ClientWebSocket 注釋的 bean 的連接實例。
例如考慮以下實現(xiàn):
WebSocket 聊天示例
Java |
Groovy |
Kotlin |
import io.micronaut.http.HttpRequest;
import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.annotation.ClientWebSocket;
import io.micronaut.websocket.annotation.OnMessage;
import io.micronaut.websocket.annotation.OnOpen;
import org.reactivestreams.Publisher;
import io.micronaut.core.async.annotation.SingleResult;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
@ClientWebSocket("/chat/{topic}/{username}") // (1)
public abstract class ChatClientWebSocket implements AutoCloseable { // (2)
private WebSocketSession session;
private HttpRequest request;
private String topic;
private String username;
private Collection<String> replies = new ConcurrentLinkedQueue<>();
@OnOpen
public void onOpen(String topic, String username,
WebSocketSession session, HttpRequest request) { // (3)
this.topic = topic;
this.username = username;
this.session = session;
this.request = request;
}
public String getTopic() {
return topic;
}
public String getUsername() {
return username;
}
public Collection<String> getReplies() {
return replies;
}
public WebSocketSession getSession() {
return session;
}
public HttpRequest getRequest() {
return request;
}
@OnMessage
public void onMessage(String message) {
replies.add(message); // (4)
}
|
import io.micronaut.http.HttpRequest
import io.micronaut.websocket.WebSocketSession
import io.micronaut.websocket.annotation.ClientWebSocket
import io.micronaut.websocket.annotation.OnMessage
import io.micronaut.websocket.annotation.OnOpen
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Future
import io.micronaut.core.async.annotation.SingleResult
@ClientWebSocket("/chat/{topic}/{username}") // (1)
abstract class ChatClientWebSocket implements AutoCloseable { // (2)
private WebSocketSession session
private HttpRequest request
private String topic
private String username
private Collection<String> replies = new ConcurrentLinkedQueue<>()
@OnOpen
void onOpen(String topic, String username,
WebSocketSession session, HttpRequest request) { // (3)
this.topic = topic
this.username = username
this.session = session
this.request = request
}
String getTopic() {
topic
}
String getUsername() {
username
}
Collection<String> getReplies() {
replies
}
WebSocketSession getSession() {
session
}
HttpRequest getRequest() {
request
}
@OnMessage
void onMessage(String message) {
replies << message // (4)
}
|
import io.micronaut.http.HttpRequest
import io.micronaut.websocket.WebSocketSession
import io.micronaut.websocket.annotation.ClientWebSocket
import io.micronaut.websocket.annotation.OnMessage
import io.micronaut.websocket.annotation.OnOpen
import reactor.core.publisher.Mono
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Future
@ClientWebSocket("/chat/{topic}/{username}") // (1)
abstract class ChatClientWebSocket : AutoCloseable { // (2)
var session: WebSocketSession? = null
private set
var request: HttpRequest<*>? = null
private set
var topic: String? = null
private set
var username: String? = null
private set
private val replies = ConcurrentLinkedQueue<String>()
@OnOpen
fun onOpen(topic: String, username: String,
session: WebSocketSession, request: HttpRequest<*>) { // (3)
this.topic = topic
this.username = username
this.session = session
this.request = request
}
fun getReplies(): Collection<String> {
return replies
}
@OnMessage
fun onMessage(message: String) {
replies.add(message) // (4)
}
|
該類是抽象的(稍后會詳細(xì)介紹)并使用 @ClientWebSocket 進(jìn)行注釋
客戶端必須實現(xiàn) AutoCloseable 并且您應(yīng)該確保連接在某個時候關(guān)閉。
您可以使用與服務(wù)器上相同的注釋,在本例中為@OnOpen 以獲取對底層會話的引用。
@OnMessage 注釋定義了從服務(wù)器接收響應(yīng)的方法。
您還可以定義以發(fā)送或廣播開頭的抽象方法,這些方法將在編譯時為您實現(xiàn)。例如:
WebSocket 發(fā)送方法
public abstract void send(String message);
請注意,通過返回 void 這會告訴 Micronaut 該方法是阻塞發(fā)送。您可以改為定義返回期貨或發(fā)布者的方法:
WebSocket 發(fā)送方法
public abstract reactor.core.publisher.Mono<String> send(String message);
上面的示例定義了一個返回 Mono 的發(fā)送方法。
WebSocket 發(fā)送方法
public abstract java.util.concurrent.Future<String> sendAsync(String message);
上面的示例定義了一個異步執(zhí)行并返回 Future 以訪問結(jié)果的發(fā)送方法。
一旦定義了客戶端類,就可以連接到客戶端套接字并開始發(fā)送消息:
連接客戶端 WebSocket
ChatClientWebSocket chatClient = webSocketClient
.connect(ChatClientWebSocket.class, "/chat/football/fred")
.blockFirst();
chatClient.send("Hello World!");
出于說明目的,我們使用 blockFirst() 來獲取客戶端。然而,可以結(jié)合連接(它返回一個 Flux)來通過 WebSocket 執(zhí)行非阻塞交互。
使用 CLI
如果您使用 Micronaut CLI 和默認(rèn)(服務(wù))配置文件創(chuàng)建項目,則可以使用 create-websocket-client 命令創(chuàng)建一個帶有 WebSocketClient 的抽象類。
$ mn create-websocket-client MyChat
| Rendered template WebsocketClient.java to destination src/main/java/example/MyChatClient.java
更多建議: