HttpClient 接口構(gòu)成了低級(jí) API 的基礎(chǔ)。此接口聲明有助于簡(jiǎn)化執(zhí)行 HTTP 請(qǐng)求和接收響應(yīng)的方法。
HttpClient 接口中的大多數(shù)方法都返回 Reactive Streams Publisher 實(shí)例,這并不總是最有用的接口。
Micronaut 的 Reactor HTTP Client 依賴項(xiàng)附帶一個(gè)名為 ReactorHttpClient 的子接口。它提供了返回 Project Reactor Flux 類型的 HttpClient 接口的變體。
發(fā)送您的第一個(gè) HTTP 請(qǐng)求
獲取 HttpClient
有幾種方法可以獲取對(duì) HttpClient 的引用。最常見(jiàn)的是使用 Client 注釋。例如:
注入 HTTP 客戶端
@Client("https://api.twitter.com/1.1") @Inject HttpClient httpClient;
上面的示例注入了一個(gè)以 Twitter API 為目標(biāo)的客戶端。
@field:Client("\${myapp.api.twitter.url}") @Inject lateinit var httpClient: HttpClient
上面的 Kotlin 示例使用配置路徑注入了一個(gè)以 Twitter API 為目標(biāo)的客戶端。請(qǐng)注意“\${path.to.config}”上所需的轉(zhuǎn)義(反斜杠),這是由于 Kotlin 字符串插值所必需的。
Client 注釋也是一個(gè)自定義范圍,用于管理 HttpClient 實(shí)例的創(chuàng)建并確保它們?cè)趹?yīng)用程序關(guān)閉時(shí)停止。
您傳遞給 Client 注釋的值可以是以下之一:
另一種創(chuàng)建 HttpClient 的方法是使用 HttpClient 的靜態(tài)創(chuàng)建方法,但是不推薦使用這種方法,因?yàn)槟仨毚_保手動(dòng)關(guān)閉客戶端,當(dāng)然創(chuàng)建的客戶端不會(huì)發(fā)生依賴注入。
執(zhí)行 HTTP GET
使用 HttpClient 時(shí),通常有兩種感興趣的方法。第一個(gè)是 retrieve,它執(zhí)行一個(gè) HTTP 請(qǐng)求并以您作為 Publisher 請(qǐng)求的任何類型(默認(rèn)為 String)返回正文。
retrieve 方法接受一個(gè) HttpRequest 或一個(gè)字符串 URI 到您希望請(qǐng)求的端點(diǎn)。
以下示例顯示如何使用 retrieve 執(zhí)行 HTTP GET 并將響應(yīng)主體作為字符串接收:
使用檢索
Java |
Groovy |
Kotlin |
String uri = UriBuilder.of("/hello/{name}")
.expand(Collections.singletonMap("name", "John"))
.toString();
assertEquals("/hello/John", uri);
String result = client.toBlocking().retrieve(uri);
assertEquals("Hello John", result);
|
when:
String uri = UriBuilder.of("/hello/{name}")
.expand(name: "John")
then:
"/hello/John" == uri
when:
String result = client.toBlocking().retrieve(uri)
then:
"Hello John" == result
|
val uri = UriBuilder.of("/hello/{name}")
.expand(Collections.singletonMap("name", "John"))
.toString()
uri shouldBe "/hello/John"
val result = client.toBlocking().retrieve(uri)
result shouldBe "Hello John"
|
請(qǐng)注意,在此示例中,出于說(shuō)明目的,我們調(diào)用 toBlocking() 以返回客戶端的阻塞版本。但是,在生產(chǎn)代碼中,您不應(yīng)該這樣做,而應(yīng)該依賴 Micronaut HTTP 服務(wù)器的非阻塞特性。
例如,以下 @Controller 方法以非阻塞方式調(diào)用另一個(gè)端點(diǎn):
不阻塞地使用 HTTP 客戶端
Java |
Groovy |
Kotlin |
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.Post;
import io.micronaut.http.annotation.Status;
import io.micronaut.http.client.HttpClient;
import io.micronaut.http.client.annotation.Client;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import io.micronaut.core.async.annotation.SingleResult;
import static io.micronaut.http.HttpRequest.GET;
import static io.micronaut.http.HttpStatus.CREATED;
import static io.micronaut.http.MediaType.TEXT_PLAIN;
@Get("/hello/{name}")
@SingleResult
Publisher<String> hello(String name) { // (1)
return Mono.from(httpClient.retrieve(GET("/hello/" + name))); // (2)
}
|
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.Post
import io.micronaut.http.annotation.Status
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import org.reactivestreams.Publisher
import io.micronaut.core.async.annotation.SingleResult
import reactor.core.publisher.Mono
import static io.micronaut.http.HttpRequest.GET
import static io.micronaut.http.HttpStatus.CREATED
import static io.micronaut.http.MediaType.TEXT_PLAIN
@Get("/hello/{name}")
@SingleResult
Publisher<String> hello(String name) { // (1)
Mono.from(httpClient.retrieve( GET("/hello/" + name))) // (2)
}
|
import io.micronaut.http.HttpRequest.GET
import io.micronaut.http.HttpStatus.CREATED
import io.micronaut.http.MediaType.TEXT_PLAIN
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.Post
import io.micronaut.http.annotation.Status
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import io.micronaut.core.async.annotation.SingleResult
@Get("/hello/{name}")
@SingleResult
internal fun hello(name: String): Publisher<String> { // (1)
return Flux.from(httpClient.retrieve(GET<Any>("/hello/$name")))
.next() // (2)
}
|
hello 方法返回一個(gè) Mono,它可能會(huì)或可能不會(huì)發(fā)出一個(gè)項(xiàng)目。如果未發(fā)出某個(gè)項(xiàng)目,則返回 404。
檢索方法被調(diào)用,它返回一個(gè) Flux。這有一個(gè) firstElement 方法返回第一個(gè)發(fā)出的項(xiàng)目或什么都不返回
使用 Reactor(如果您愿意,也可以使用 RxJava),您可以輕松高效地編寫(xiě)多個(gè) HTTP 客戶端調(diào)用,而不會(huì)阻塞(這會(huì)限制您的應(yīng)用程序的吞吐量和可擴(kuò)展性)。
調(diào)試/跟蹤 HTTP 客戶端
要調(diào)試從 HTTP 客戶端發(fā)送和接收的請(qǐng)求,您可以通過(guò) logback.xml 文件啟用跟蹤日志記錄:
logback.xml
<logger name="io.micronaut.http.client" level="TRACE"/>
客戶端特定調(diào)試/跟蹤
要啟用特定于客戶端的日志記錄,您可以為所有 HTTP 客戶端配置默認(rèn)記錄器。您還可以使用特定于客戶端的配置為不同的客戶端配置不同的記錄器。例如,在您的配置文件(例如 application.yml)中:
Properties |
Yaml |
Toml |
Groovy |
Hocon |
JSON |
micronaut.http.client.logger-name=mylogger
micronaut.http.services.otherClient.logger-name=other.client
|
micronaut:
http:
client:
logger-name: mylogger
services:
otherClient:
logger-name: other.client
|
[micronaut]
[micronaut.http]
[micronaut.http.client]
logger-name="mylogger"
[micronaut.http.services]
[micronaut.http.services.otherClient]
logger-name="other.client"
|
micronaut {
http {
client {
loggerName = "mylogger"
}
services {
otherClient {
loggerName = "other.client"
}
}
}
}
|
{
micronaut {
http {
client {
logger-name = "mylogger"
}
services {
otherClient {
logger-name = "other.client"
}
}
}
}
}
|
{
"micronaut": {
"http": {
"client": {
"logger-name": "mylogger"
},
"services": {
"otherClient": {
"logger-name": "other.client"
}
}
}
}
}
|
然后在 logback.xml 中啟用日志記錄:
logback.xml
<logger name="mylogger" level="DEBUG"/>
<logger name="other.client" level="TRACE"/>
自定義 HTTP 請(qǐng)求
前面的示例演示了使用 HttpRequest 接口的靜態(tài)方法來(lái)構(gòu)造 MutableHttpRequest 實(shí)例。顧名思義,MutableHttpRequest 可以改變,包括添加標(biāo)頭、自定義請(qǐng)求正文等的能力。例如:
傳遞一個(gè) HttpRequest 來(lái)檢索
Java |
Groovy |
Kotlin |
Flux<String> response = Flux.from(client.retrieve(
GET("/hello/John")
.header("X-My-Header", "SomeValue")
));
|
Flux<String> response = Flux.from(client.retrieve(
GET("/hello/John")
.header("X-My-Header", "SomeValue")
))
|
val response = client.retrieve(
GET<Any>("/hello/John")
.header("X-My-Header", "SomeValue")
)
|
上面的示例在發(fā)送之前向響應(yīng)添加一個(gè)標(biāo)頭(X-My-Header)。 MutableHttpRequest 接口有更多方便的方法,可以很容易地以常用的方式修改請(qǐng)求。
讀取 JSON 響應(yīng)
微服務(wù)通常使用 JSON 等消息編碼格式。 Micronaut 的 HTTP 客戶端利用 Jackson 進(jìn)行 JSON 解析,因此 Jackson 可以解碼的任何類型都可以作為第二個(gè)參數(shù)傳遞給檢索。
例如,考慮以下返回 JSON 響應(yīng)的 @Controller 方法:
從控制器返回 JSON
Java |
Groovy |
Kotlin |
@Get("/greet/{name}")
Message greet(String name) {
return new Message("Hello " + name);
}
|
@Get("/greet/{name}")
Message greet(String name) {
new Message("Hello $name")
}
|
@Get("/greet/{name}")
internal fun greet(name: String): Message {
return Message("Hello $name")
}
|
上面的方法返回一個(gè) Message 類型的 POJO,如下所示:
Message POJO
Java |
Groovy |
Kotlin |
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class Message {
private final String text;
@JsonCreator
public Message(@JsonProperty("text") String text) {
this.text = text;
}
public String getText() {
return text;
}
}
|
import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty
class Message {
final String text
@JsonCreator
Message(@JsonProperty("text") String text) {
this.text = text
}
}
|
import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty
class Message @JsonCreator
constructor(@param:JsonProperty("text") val text: String)
|
Jackson注解用于映射構(gòu)造函數(shù)
在客戶端,您可以調(diào)用此端點(diǎn)并使用 retrieve 方法將 JSON 解碼為映射,如下所示:
將響應(yīng)主體解碼為 Map
Java |
Groovy |
Kotlin |
response = Flux.from(client.retrieve(
GET("/greet/John"),
Argument.of(Map.class, String.class, String.class) // (1)
));
|
response = Flux.from(client.retrieve(
GET("/greet/John"),
Argument.of(Map, String, String) // (1)
))
|
var response: Flux<Map<*, *>> = Flux.from(client.retrieve(
GET<Any>("/greet/John"), Map::class.java
))
|
Argument.of 方法返回一個(gè) Map,其中鍵和值類型為 String
雖然檢索 JSON 作為映射可能是可取的,但通常您希望將對(duì)象解碼為 POJO。為此,請(qǐng)改為傳遞類型:
將響應(yīng)主體解碼為 POJO
Java |
Groovy |
Kotlin |
Flux<Message> response = Flux.from(client.retrieve(
GET("/greet/John"), Message.class
));
assertEquals("Hello John", response.blockFirst().getText());
|
when:
Flux<Message> response = Flux.from(client.retrieve(
GET("/greet/John"), Message
))
then:
"Hello John" == response.blockFirst().getText()
|
val response = Flux.from(client.retrieve(
GET<Any>("/greet/John"), Message::class.java
))
response.blockFirst().text shouldBe "Hello John"
|
請(qǐng)注意如何在客戶端和服務(wù)器上使用相同的 Java 類型。這意味著您通常會(huì)定義一個(gè)公共 API 項(xiàng)目,在該項(xiàng)目中定義用于定義 API 的接口和類型。
解碼其他內(nèi)容類型
如果您與之通信的服務(wù)器使用非 JSON 的自定義內(nèi)容類型,默認(rèn)情況下 Micronaut 的 HTTP 客戶端將不知道如何解碼這種類型。
要解決此問(wèn)題,請(qǐng)將 MediaTypeCodec 注冊(cè)為一個(gè) bean,它會(huì)被自動(dòng)拾取并用于解碼(或編碼)消息。
接收完整的 HTTP 響應(yīng)
有時(shí)僅接收響應(yīng)主體是不夠的,您還需要響應(yīng)中的其他信息,例如標(biāo)頭、cookie 等。在這種情況下,不要使用 retrieve 方法,而是使用 exchange 方法:
接收完整的 HTTP 響應(yīng)
Java |
Groovy |
Kotlin |
Flux<HttpResponse<Message>> call = Flux.from(client.exchange(
GET("/greet/John"), Message.class // (1)
));
HttpResponse<Message> response = call.blockFirst();
Optional<Message> message = response.getBody(Message.class); // (2)
// check the status
assertEquals(HttpStatus.OK, response.getStatus()); // (3)
// check the body
assertTrue(message.isPresent());
assertEquals("Hello John", message.get().getText());
|
when:
Flux<HttpResponse<Message>> call = Flux.from(client.exchange(
GET("/greet/John"), Message // (1)
))
HttpResponse<Message> response = call.blockFirst();
Optional<Message> message = response.getBody(Message) // (2)
// check the status
then:
HttpStatus.OK == response.getStatus() // (3)
// check the body
message.isPresent()
"Hello John" == message.get().getText()
|
val call = client.exchange(
GET<Any>("/greet/John"), Message::class.java // (1)
)
val response = Flux.from(call).blockFirst()
val message = response.getBody(Message::class.java) // (2)
// check the status
response.status shouldBe HttpStatus.OK // (3)
// check the body
message.isPresent shouldBe true
message.get().text shouldBe "Hello John"
|
交換方法接收 HttpResponse
使用響應(yīng)的 getBody(..) 方法檢索正文
可以檢查響應(yīng)的其他方面,例如 HttpStatus
上面的示例接收完整的 HttpResponse,您可以從中獲取標(biāo)頭和其他有用信息。
發(fā)布請(qǐng)求正文
到目前為止,所有示例都使用了相同的 HTTP 方法,即 GET。 HttpRequest 接口具有適用于所有不同 HTTP 方法的工廠方法。下表總結(jié)了它們:
還存在一個(gè)創(chuàng)建方法來(lái)構(gòu)造任何 HttpMethod 類型的請(qǐng)求。由于 POST、PUT 和 PATCH 方法需要主體,因此需要第二個(gè)參數(shù),即主體對(duì)象。
以下示例演示了如何發(fā)送簡(jiǎn)單的 String 正文:
發(fā)送字符串正文
Java |
Groovy |
Kotlin |
Flux<HttpResponse<String>> call = Flux.from(client.exchange(
POST("/hello", "Hello John") // (1)
.contentType(MediaType.TEXT_PLAIN_TYPE)
.accept(MediaType.TEXT_PLAIN_TYPE), // (2)
String.class // (3)
));
|
Flux<HttpResponse<String>> call = Flux.from(client.exchange(
POST("/hello", "Hello John") // (1)
.contentType(MediaType.TEXT_PLAIN_TYPE)
.accept(MediaType.TEXT_PLAIN_TYPE), // (2)
String // (3)
))
|
val call = client.exchange(
POST("/hello", "Hello John") // (1)
.contentType(MediaType.TEXT_PLAIN_TYPE)
.accept(MediaType.TEXT_PLAIN_TYPE), String::class.java // (3)
)
|
使用POST方法;第一個(gè)參數(shù)是 URI,第二個(gè)是主體
內(nèi)容類型和接受類型設(shè)置為text/plain(默認(rèn)為application/json)
預(yù)期的響應(yīng)類型是 String
發(fā)送 JSON
前面的示例發(fā)送純文本。要發(fā)送 JSON,將要編碼的對(duì)象傳遞給 JSON(無(wú)論是 Map 還是 POJO),只要 Jackson 能夠?qū)ζ溥M(jìn)行編碼。
例如,您可以從上一節(jié)創(chuàng)建一個(gè) Message 并將其傳遞給 POST 方法:
Sending a JSON body
Java |
Groovy |
Kotlin |
Flux<HttpResponse<Message>> call = Flux.from(client.exchange(
POST("/greet", new Message("Hello John")), // (1)
Message.class // (2)
));
|
Flux<HttpResponse<Message>> call = Flux.from(client.exchange(
POST("/greet", new Message("Hello John")), // (1)
Message // (2)
))
|
val call = client.exchange(
POST("/greet", Message("Hello John")), Message::class.java // (2)
)
|
創(chuàng)建 Message 實(shí)例并將其傳遞給 POST 方法
同一個(gè)類解碼響應(yīng)
在上面的示例中,以下 JSON 作為請(qǐng)求的主體發(fā)送:
Resulting JSON
{"text":"Hello John"}
可以使用 Jackson Annotations 自定義 JSON。
使用 URI 模板
如果在 URI 中包含對(duì)象的某些屬性,則可以使用 URI 模板。
例如,假設(shè)您有一個(gè)帶有 title 屬性的 Book 類。您可以在 URI 模板中包含標(biāo)題,然后從 Book 的實(shí)例中填充它。例如:
Sending a JSON body with a URI template
Java |
Groovy |
Kotlin |
Flux<HttpResponse<Book>> call = Flux.from(client.exchange(
POST("/amazon/book/{title}", new Book("The Stand")),
Book.class
));
|
Flux<HttpResponse<Book>> call = client.exchange(
POST("/amazon/book/{title}", new Book("The Stand")),
Book
);
|
val call = client.exchange(
POST("/amazon/book/{title}", Book("The Stand")),
Book::class.java
)
|
在上述情況下,title 屬性包含在 URI 中。
發(fā)送表單數(shù)據(jù)
您還可以將 POJO 或地圖編碼為表單數(shù)據(jù)而不是 JSON。只需在發(fā)布請(qǐng)求中將內(nèi)容類型設(shè)置為 application/x-www-form-urlencoded:
Sending a Form Data
Java |
Groovy |
Kotlin |
Flux<HttpResponse<Book>> call = Flux.from(client.exchange(
POST("/amazon/book/{title}", new Book("The Stand"))
.contentType(MediaType.APPLICATION_FORM_URLENCODED),
Book.class
));
|
Flux<HttpResponse<Book>> call = client.exchange(
POST("/amazon/book/{title}", new Book("The Stand"))
.contentType(MediaType.APPLICATION_FORM_URLENCODED),
Book
)
|
val call = client.exchange(
POST("/amazon/book/{title}", Book("The Stand"))
.contentType(MediaType.APPLICATION_FORM_URLENCODED),
Book::class.java
)
|
請(qǐng)注意,Jackson 也可以綁定表單數(shù)據(jù),因此要自定義綁定過(guò)程,請(qǐng)使用 Jackson 注釋。
多部分客戶端上傳
Micronaut HTTP 客戶端支持多部分請(qǐng)求。要構(gòu)建多部分請(qǐng)求,請(qǐng)將內(nèi)容類型設(shè)置為 multipart/form-data 并將正文設(shè)置為 MultipartBody 的實(shí)例。
例如:
Creating the body
Java |
Groovy |
Kotlin |
import io.micronaut.http.client.multipart.MultipartBody;
String toWrite = "test file";
File file = File.createTempFile("data", ".txt");
FileWriter writer = new FileWriter(file);
writer.write(toWrite);
writer.close();
MultipartBody requestBody = MultipartBody.builder() // (1)
.addPart( // (2)
"data",
file.getName(),
MediaType.TEXT_PLAIN_TYPE,
file
).build(); // (3)
|
import io.micronaut.http.multipart.CompletedFileUpload
import io.micronaut.http.multipart.StreamingFileUpload
import io.micronaut.http.client.multipart.MultipartBody
import org.reactivestreams.Publisher
File file = new File(uploadDir, "data.txt")
file.text = "test file"
file.createNewFile()
MultipartBody requestBody = MultipartBody.builder() // (1)
.addPart( // (2)
"data",
file.name,
MediaType.TEXT_PLAIN_TYPE,
file
).build() // (3)
|
import io.micronaut.http.client.multipart.MultipartBody
val toWrite = "test file"
val file = File.createTempFile("data", ".txt")
val writer = FileWriter(file)
writer.write(toWrite)
writer.close()
val requestBody = MultipartBody.builder() // (1)
.addPart( // (2)
"data",
file.name,
MediaType.TEXT_PLAIN_TYPE,
file
).build() // (3)
|
創(chuàng)建一個(gè) MultipartBody 構(gòu)建器,用于向主體添加部件。
將一個(gè)部分添加到正文中,在本例中是一個(gè)文件。此方法在 MultipartBody.Builder 中有不同的變體。
build 方法將構(gòu)建器中的所有部件組裝成一個(gè) MultipartBody。至少需要一個(gè)部分。
創(chuàng)建請(qǐng)求
Java |
Groovy |
Kotlin |
HttpRequest.POST("/multipart/upload", requestBody) // (1)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE) // (2)
|
HttpRequest.POST("/multipart/upload", requestBody) // (1)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE) // (2)
|
HttpRequest.POST("/multipart/upload", requestBody) // (1)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE) // (2)
|
具有不同類型數(shù)據(jù)的多部分請(qǐng)求正文。
將請(qǐng)求的內(nèi)容類型標(biāo)頭設(shè)置為 multipart/form-data。
通過(guò) HTTP 流式傳輸 JSON
Micronaut 的 HTTP 客戶端支持通過(guò) ReactorStreamingHttpClient 接口通過(guò) HTTP 流式傳輸數(shù)據(jù),該接口包括特定于流式傳輸?shù)姆椒?,包括?/p>
表 1. HTTP 流媒體方法
方法 |
描述 |
dataStream(HttpRequest<I> request)
|
將數(shù)據(jù)流作為 ByteBuffer 的 Flux 返回
|
exchangeStream(HttpRequest<I> request)
|
返回包裝 ByteBuffer 的 Flux 的 HttpResponse
|
jsonStream(HttpRequest<I> request)
|
返回一個(gè)非阻塞的 JSON 對(duì)象流
|
要使用 JSON 流,請(qǐng)?jiān)诜?wù)器上聲明一個(gè)控制器方法,該方法返回 JSON 對(duì)象的 application/x-json-stream。例如:
Streaming JSON on the Server
Java |
Groovy |
Kotlin |
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
@Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) // (1)
Publisher<Headline> streamHeadlines() {
return Mono.fromCallable(() -> { // (2)
Headline headline = new Headline();
headline.setText("Latest Headline at " + ZonedDateTime.now());
return headline;
}).repeat(100) // (3)
.delayElements(Duration.of(1, ChronoUnit.SECONDS)); // (4)
}
|
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration
import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
@Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) // (1)
Flux<Headline> streamHeadlines() {
Mono.fromCallable({ // (2)
new Headline(text: "Latest Headline at ${ZonedDateTime.now()}")
}).repeat(100) // (3)
.delayElements(Duration.of(1, ChronoUnit.SECONDS)) // (4)
}
|
import io.micronaut.http.MediaType.APPLICATION_JSON_STREAM
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration
import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit.SECONDS
@Get(value = "/headlines", processes = [APPLICATION_JSON_STREAM]) // (1)
internal fun streamHeadlines(): Flux<Headline> {
return Mono.fromCallable { // (2)
val headline = Headline()
headline.text = "Latest Headline at ${ZonedDateTime.now()}"
headline
}.repeat(100) // (3)
.delayElements(Duration.of(1, ChronoUnit.SECONDS)) // (4)
}
|
streamHeadlines 方法產(chǎn)生 application/x-json-stream
Flux 是從 Callable 函數(shù)創(chuàng)建的(注意函數(shù)內(nèi)不會(huì)發(fā)生阻塞,所以這沒(méi)問(wèn)題,否則你應(yīng)該訂閱 I/O 線程池)。
Flux 重復(fù) 100 次
Flux 發(fā)射物品,每個(gè)物品之間有 1 秒的延遲
服務(wù)器不必用 Micronaut 編寫(xiě),任何支持 JSON 流的服務(wù)器都可以。
然后在客戶端上,使用 jsonStream 訂閱流,每次服務(wù)器發(fā)出 JSON 對(duì)象時(shí),客戶端都會(huì)解碼并使用它:
在客戶端流式傳輸 JSON
Java |
Groovy |
Kotlin |
Flux<Headline> headlineStream = Flux.from(client.jsonStream(
GET("/streaming/headlines"), Headline.class)); // (1)
CompletableFuture<Headline> future = new CompletableFuture<>(); // (2)
headlineStream.subscribe(new Subscriber<Headline>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1); // (3)
}
@Override
public void onNext(Headline headline) {
System.out.println("Received Headline = " + headline.getText());
future.complete(headline); // (4)
}
@Override
public void onError(Throwable t) {
future.completeExceptionally(t); // (5)
}
@Override
public void onComplete() {
// no-op // (6)
}
});
|
Flux<Headline> headlineStream = Flux.from(client.jsonStream(
GET("/streaming/headlines"), Headline)) // (1)
CompletableFuture<Headline> future = new CompletableFuture<>() // (2)
headlineStream.subscribe(new Subscriber<Headline>() {
@Override
void onSubscribe(Subscription s) {
s.request(1) // (3)
}
@Override
void onNext(Headline headline) {
println "Received Headline = $headline.text"
future.complete(headline) // (4)
}
@Override
void onError(Throwable t) {
future.completeExceptionally(t) // (5)
}
@Override
void onComplete() {
// no-op // (6)
}
})
|
val headlineStream = client.jsonStream(
GET<Any>("/streaming/headlines"), Headline::class.java) // (1)
val future = CompletableFuture<Headline>() // (2)
headlineStream.subscribe(object : Subscriber<Headline> {
override fun onSubscribe(s: Subscription) {
s.request(1) // (3)
}
override fun onNext(headline: Headline) {
println("Received Headline = ${headline.text!!}")
future.complete(headline) // (4)
}
override fun onError(t: Throwable) {
future.completeExceptionally(t) // (5)
}
override fun onComplete() {
// no-op // (6)
}
})
|
jsonStream 方法返回一個(gè) Flux
CompletableFuture 用于接收值,但是您對(duì)每個(gè)發(fā)出的項(xiàng)目執(zhí)行的操作是特定于應(yīng)用程序的
訂閱請(qǐng)求單個(gè)項(xiàng)目。您可以使用訂閱來(lái)調(diào)節(jié)背壓和需求。
onNext 方法在一個(gè)項(xiàng)目被發(fā)出時(shí)被調(diào)用
發(fā)生錯(cuò)誤時(shí)調(diào)用 onError 方法
onComplete 方法在所有 Headline 實(shí)例發(fā)出后被調(diào)用
請(qǐng)注意,上例中的服務(wù)器和客戶端都不執(zhí)行任何阻塞 I/O。
配置 HTTP 客戶端
所有客戶端的全局配置
默認(rèn)的 HTTP 客戶端配置是一個(gè)名為 DefaultHttpClientConfiguration 的配置屬性,它允許為所有 HTTP 客戶端配置默認(rèn)行為。例如,在您的配置文件中(例如 application.yml):
更改默認(rèn) HTTP 客戶端配置
Properties |
Yaml |
Toml |
Groovy |
Hocon |
JSON |
micronaut.http.client.read-timeout=5s
|
micronaut:
http:
client:
read-timeout: 5s
|
[micronaut]
[micronaut.http]
[micronaut.http.client]
read-timeout="5s"
|
micronaut {
http {
client {
readTimeout = "5s"
}
}
}
|
{
micronaut {
http {
client {
read-timeout = "5s"
}
}
}
}
|
{
"micronaut": {
"http": {
"client": {
"read-timeout": "5s"
}
}
}
}
|
上面的示例設(shè)置了 HttpClientConfiguration 類的 readTimeout 屬性。
客戶端特定配置
要為每個(gè)客戶端單獨(dú)配置,有幾個(gè)選項(xiàng)。您可以在配置文件(例如 application.yml)中手動(dòng)配置服務(wù)發(fā)現(xiàn)并應(yīng)用每個(gè)客戶端配置:
手動(dòng)配置 HTTP 服務(wù)
Properties |
Yaml |
Toml |
Groovy |
Hocon |
JSON |
micronaut.http.services.foo.urls[0]=http://foo1
micronaut.http.services.foo.urls[1]=http://foo2
micronaut.http.services.foo.read-timeout=5s
|
micronaut:
http:
services:
foo:
urls:
- http://foo1
- http://foo2
read-timeout: 5s
|
[micronaut]
[micronaut.http]
[micronaut.http.services]
[micronaut.http.services.foo]
urls=[
"http://foo1",
"http://foo2"
]
read-timeout="5s"
|
micronaut {
http {
services {
foo {
urls = ["http://foo1", "http://foo2"]
readTimeout = "5s"
}
}
}
}
|
{
micronaut {
http {
services {
foo {
urls = ["http://foo1", "http://foo2"]
read-timeout = "5s"
}
}
}
}
}
|
{
"micronaut": {
"http": {
"services": {
"foo": {
"urls": ["http://foo1", "http://foo2"],
"read-timeout": "5s"
}
}
}
}
}
|
警告:此客戶端配置可以與 @Client 注釋結(jié)合使用,通過(guò)直接注入 HttpClient 或在客戶端界面上使用。在任何情況下,注釋上的所有其他屬性都將被忽略,除了服務(wù) id。
然后,注入指定的客戶端配置:
注入 HTTP 客戶端
@Client("foo") @Inject ReactorHttpClient httpClient;
您還可以定義一個(gè)從 HttpClientConfiguration 擴(kuò)展的 bean,并確保 javax.inject.Named 注釋適當(dāng)?shù)孛?
定義 HTTP 客戶端配置 bean
@Named("twitter")
@Singleton
class TwitterHttpClientConfiguration extends HttpClientConfiguration {
public TwitterHttpClientConfiguration(ApplicationConfiguration configuration) {
super(configuration);
}
}
如果您使用服務(wù)發(fā)現(xiàn)使用 @Client 注入名為 twitter 的服務(wù),則將選擇此配置:
注入 HTTP 客戶端
@Client("twitter") @Inject ReactorHttpClient httpClient;
或者,如果您不使用服務(wù)發(fā)現(xiàn),則可以使用 @Client 的配置成員來(lái)引用特定類型:
注入 HTTP 客戶端
@Client(value = "https://api.twitter.com/1.1",
configuration = TwitterHttpClientConfiguration.class)
@Inject
ReactorHttpClient httpClient;
使用 HTTP 客戶端連接池
處理大量請(qǐng)求的客戶端將受益于啟用 HTTP 客戶端連接池。以下配置為 foo 客戶端啟用池化:
手動(dòng)配置 HTTP 服務(wù)
Properties |
Yaml |
Toml |
Groovy |
Hocon |
JSON |
micronaut.http.services.foo.urls[0]=http://foo1
micronaut.http.services.foo.urls[1]=http://foo2
micronaut.http.services.foo.pool.enabled=true
micronaut.http.services.foo.pool.max-connections=50
|
micronaut:
http:
services:
foo:
urls:
- http://foo1
- http://foo2
pool:
enabled: true
max-connections: 50
|
[micronaut]
[micronaut.http]
[micronaut.http.services]
[micronaut.http.services.foo]
urls=[
"http://foo1",
"http://foo2"
]
[micronaut.http.services.foo.pool]
enabled=true
max-connections=50
|
micronaut {
http {
services {
foo {
urls = ["http://foo1", "http://foo2"]
pool {
enabled = true
maxConnections = 50
}
}
}
}
}
|
{
micronaut {
http {
services {
foo {
urls = ["http://foo1", "http://foo2"]
pool {
enabled = true
max-connections = 50
}
}
}
}
}
}
|
{
"micronaut": {
"http": {
"services": {
"foo": {
"urls": ["http://foo1", "http://foo2"],
"pool": {
"enabled": true,
"max-connections": 50
}
}
}
}
}
}
|
有關(guān)可用池配置選項(xiàng)的詳細(xì)信息,請(qǐng)參閱 ConnectionPoolConfiguration 的 API。
配置事件循環(huán)組
默認(rèn)情況下,Micronaut 為工作線程和所有 HTTP 客戶端線程共享一個(gè)通用的 Netty EventLoopGroup。
這個(gè) EventLoopGroup 可以通過(guò) micronaut.netty.event-loops.default 屬性進(jìn)行配置:
配置默認(rèn)事件循環(huán)
Properties |
Yaml |
Toml |
Groovy |
Hocon |
JSON |
micronaut.netty.event-loops.default.num-threads=10
micronaut.netty.event-loops.default.prefer-native-transport=true
|
micronaut:
netty:
event-loops:
default:
num-threads: 10
prefer-native-transport: true
|
[micronaut]
[micronaut.netty]
[micronaut.netty.event-loops]
[micronaut.netty.event-loops.default]
num-threads=10
prefer-native-transport=true
|
micronaut {
netty {
eventLoops {
'default' {
numThreads = 10
preferNativeTransport = true
}
}
}
}
|
{
micronaut {
netty {
event-loops {
default {
num-threads = 10
prefer-native-transport = true
}
}
}
}
}
|
{
"micronaut": {
"netty": {
"event-loops": {
"default": {
"num-threads": 10,
"prefer-native-transport": true
}
}
}
}
}
|
您還可以使用 micronaut.netty.event-loops 設(shè)置來(lái)配置一個(gè)或多個(gè)額外的事件循環(huán)。下表總結(jié)了屬性:
表 1. DefaultEventLoopGroupConfiguration 的配置屬性
屬性 |
類型 |
描述 |
micronaut.netty.event-loops.*.num-threads
|
int
|
|
micronaut.netty.event-loops.*.io-ratio
|
java.lang.Integer
|
|
micronaut.netty.event-loops.*.prefer-native-transport
|
boolean
|
|
micronaut.netty.event-loops.*.executor
|
java.lang.String
|
|
micronaut.netty.event-loops.*.shutdown-quiet-period
|
java.time.Duration
|
|
micronaut.netty.event-loops.*.shutdown-timeout
|
java.time.Duration
|
|
例如,如果您與 HTTP 客戶端的交互涉及 CPU 密集型工作,則可能值得為一個(gè)或所有客戶端配置一個(gè)單獨(dú)的 EventLoopGroup。
以下示例配置了一個(gè)名為“other”的附加事件循環(huán)組,其中包含 10 個(gè)線程:
配置額外的事件循環(huán)
Properties |
Yaml |
Toml |
Groovy |
Hocon |
JSON |
micronaut.netty.event-loops.other.num-threads=10
micronaut.netty.event-loops.other.prefer-native-transport=true
|
micronaut:
netty:
event-loops:
other:
num-threads: 10
prefer-native-transport: true
|
[micronaut]
[micronaut.netty]
[micronaut.netty.event-loops]
[micronaut.netty.event-loops.other]
num-threads=10
prefer-native-transport=true
|
micronaut {
netty {
eventLoops {
other {
numThreads = 10
preferNativeTransport = true
}
}
}
}
|
{
micronaut {
netty {
event-loops {
other {
num-threads = 10
prefer-native-transport = true
}
}
}
}
}
|
{
"micronaut": {
"netty": {
"event-loops": {
"other": {
"num-threads": 10,
"prefer-native-transport": true
}
}
}
}
}
|
配置附加事件循環(huán)后,您可以更改 HTTP 客戶端配置以使用它:
改變客戶端使用的事件循環(huán)組
Properties |
Yaml |
Toml |
Groovy |
Hocon |
JSON |
micronaut.http.client.event-loop-group=other
|
micronaut:
http:
client:
event-loop-group: other
|
[micronaut]
[micronaut.http]
[micronaut.http.client]
event-loop-group="other"
|
micronaut {
http {
client {
eventLoopGroup = "other"
}
}
}
|
{
micronaut {
http {
client {
event-loop-group = "other"
}
}
}
}
|
{
"micronaut": {
"http": {
"client": {
"event-loop-group": "other"
}
}
}
}
|
錯(cuò)誤響應(yīng)
如果返回代碼為 400 或更高的 HTTP 響應(yīng),則會(huì)創(chuàng)建 HttpClientResponseException。異常包含原始響應(yīng)。如何拋出異常取決于方法的返回類型。
對(duì)于阻塞客戶端,拋出異常并應(yīng)由調(diào)用者捕獲和處理。對(duì)于反應(yīng)式客戶端,異常作為錯(cuò)誤通過(guò)發(fā)布者傳遞。
綁定錯(cuò)誤
如果請(qǐng)求成功,您通常希望使用端點(diǎn)并綁定到 POJO,如果發(fā)生錯(cuò)誤則綁定到不同的 POJO。以下示例顯示如何調(diào)用具有成功和錯(cuò)誤類型的交換。
Java |
Groovy |
Kotlin |
@Controller("/books")
public class BooksController {
@Get("/{isbn}")
public HttpResponse find(String isbn) {
if (isbn.equals("1680502395")) {
Map<String, Object> m = new HashMap<>();
m.put("status", 401);
m.put("error", "Unauthorized");
m.put("message", "No message available");
m.put("path", "/books/" + isbn);
return HttpResponse.status(HttpStatus.UNAUTHORIZED).body(m);
}
return HttpResponse.ok(new Book("1491950358", "Building Microservices"));
}
}
|
@Controller("/books")
class BooksController {
@Get("/{isbn}")
HttpResponse find(String isbn) {
if (isbn == "1680502395") {
Map<String, Object> m = [
status : 401,
error : "Unauthorized",
message: "No message available",
path : "/books/" + isbn]
return HttpResponse.status(HttpStatus.UNAUTHORIZED).body(m)
}
return HttpResponse.ok(new Book("1491950358", "Building Microservices"))
}
}
|
@Controller("/books")
class BooksController {
@Get("/{isbn}")
fun find(isbn: String): HttpResponse<*> {
if (isbn == "1680502395") {
val m = mapOf(
"status" to 401,
"error" to "Unauthorized",
"message" to "No message available",
"path" to "/books/$isbn"
)
return HttpResponse.status<Any>(HttpStatus.UNAUTHORIZED).body(m)
}
return HttpResponse.ok(Book("1491950358", "Building Microservices"))
}
}
|
Java |
Groovy |
Kotlin |
@Test
public void afterAnHttpClientExceptionTheResponseBodyCanBeBoundToAPOJO() {
try {
client.toBlocking().exchange(HttpRequest.GET("/books/1680502395"),
Argument.of(Book.class), // (1)
Argument.of(CustomError.class)); // (2)
} catch (HttpClientResponseException e) {
assertEquals(HttpStatus.UNAUTHORIZED, e.getResponse().getStatus());
Optional<CustomError> jsonError = e.getResponse().getBody(CustomError.class);
assertTrue(jsonError.isPresent());
assertEquals(401, jsonError.get().status);
assertEquals("Unauthorized", jsonError.get().error);
assertEquals("No message available", jsonError.get().message);
assertEquals("/books/1680502395", jsonError.get().path);
}
}
|
def "after an HttpClientException the response body can be bound to a POJO"() {
when:
client.toBlocking().exchange(HttpRequest.GET("/books/1680502395"),
Argument.of(Book), // (1)
Argument.of(CustomError)) // (2)
then:
def e = thrown(HttpClientResponseException)
e.response.status == HttpStatus.UNAUTHORIZED
when:
Optional<CustomError> jsonError = e.response.getBody(CustomError)
then:
jsonError.isPresent()
jsonError.get().status == 401
jsonError.get().error == 'Unauthorized'
jsonError.get().message == 'No message available'
jsonError.get().path == '/books/1680502395'
}
|
"after an httpclient exception the response body can be bound to a POJO" {
try {
client.toBlocking().exchange(HttpRequest.GET<Any>("/books/1680502395"),
Argument.of(Book::class.java), // (1)
Argument.of(CustomError::class.java)) // (2)
} catch (e: HttpClientResponseException) {
e.response.status shouldBe HttpStatus.UNAUTHORIZED
}
}
|
- 成功類型
- 錯(cuò)誤類型
更多建議: