如前所述,Micronaut 建立在 Netty 之上,Netty 是圍繞事件循環(huán)模型和非阻塞 I/O 設(shè)計(jì)的。 Micronaut 在與請(qǐng)求線程(事件循環(huán)線程)相同的線程中執(zhí)行在 @Controller bean 中定義的代碼。
這使得在執(zhí)行任何阻塞 I/O 操作(例如與 Hibernate/JPA 或 JDBC 的交互)時(shí)將這些任務(wù)卸載到不阻塞事件循環(huán)的單獨(dú)線程池變得至關(guān)重要。
例如,以下配置將 I/O 線程池配置為具有 75 個(gè)線程的固定線程池(類似于 Tomcat 等傳統(tǒng)阻塞服務(wù)器在每個(gè)請(qǐng)求線程模型中使用的線程池):
配置IO線程池
Properties |
Yaml |
Toml |
Groovy |
Hocon |
JSON |
micronaut.executors.io.type=fixed
micronaut.executors.io.nThreads=75
|
micronaut:
executors:
io:
type: fixed
nThreads: 75
|
[micronaut]
[micronaut.executors]
[micronaut.executors.io]
type="fixed"
nThreads=75
|
micronaut {
executors {
io {
type = "fixed"
nThreads = 75
}
}
}
|
{
micronaut {
executors {
io {
type = "fixed"
nThreads = 75
}
}
}
}
|
{
"micronaut": {
"executors": {
"io": {
"type": "fixed",
"nThreads": 75
}
}
}
}
|
要在 @Controller bean 中使用此線程池,您有多種選擇。最簡(jiǎn)單的是使用@ExecuteOn 注釋,它可以在類型或方法級(jí)別聲明,以指示在哪個(gè)配置的線程池上運(yùn)行控制器的方法:
使用@ExecuteOn
Java |
Groovy |
Kotlin |
import io.micronaut.docs.http.server.reactive.PersonService;
import io.micronaut.docs.ioc.beans.Person;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
@Controller("/executeOn/people")
public class PersonController {
private final PersonService personService;
PersonController(PersonService personService) {
this.personService = personService;
}
@Get("/{name}")
@ExecuteOn(TaskExecutors.IO) // (1)
Person byName(String name) {
return personService.findByName(name);
}
}
|
import io.micronaut.docs.http.server.reactive.PersonService
import io.micronaut.docs.ioc.beans.Person
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
@Controller("/executeOn/people")
class PersonController {
private final PersonService personService
PersonController(PersonService personService) {
this.personService = personService
}
@Get("/{name}")
@ExecuteOn(TaskExecutors.IO) // (1)
Person byName(String name) {
personService.findByName(name)
}
}
|
import io.micronaut.docs.http.server.reactive.PersonService
import io.micronaut.docs.ioc.beans.Person
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
@Controller("/executeOn/people")
class PersonController (private val personService: PersonService) {
@Get("/{name}")
@ExecuteOn(TaskExecutors.IO) // (1)
fun byName(name: String): Person {
return personService.findByName(name)
}
}
|
@ExecuteOn 注解用于在I/O線程池上執(zhí)行操作
@ExecuteOn 注解的值可以是在 micronaut.executors 下定義的任何命名執(zhí)行器。
一般來說,對(duì)于數(shù)據(jù)庫操作,您需要配置一個(gè)與數(shù)據(jù)庫連接池中指定的最大連接數(shù)相匹配的線程池。
@ExecuteOn 注釋的替代方法是使用您選擇的響應(yīng)式庫提供的工具。諸如 Project Reactor 或 RxJava 之類的響應(yīng)式實(shí)現(xiàn)具有一個(gè) subscribeOn 方法,該方法可讓您更改執(zhí)行用戶代碼的線程。例如:
響應(yīng)式 subscribeOn 示例
Java |
Groovy |
Kotlin |
import io.micronaut.docs.ioc.beans.Person;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.scheduling.TaskExecutors;
import jakarta.inject.Named;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import io.micronaut.core.async.annotation.SingleResult;
import java.util.concurrent.ExecutorService;
@Controller("/subscribeOn/people")
public class PersonController {
private final Scheduler scheduler;
private final PersonService personService;
PersonController(
@Named(TaskExecutors.IO) ExecutorService executorService, // (1)
PersonService personService) {
this.scheduler = Schedulers.fromExecutorService(executorService);
this.personService = personService;
}
@Get("/{name}")
@SingleResult
Publisher<Person> byName(String name) {
return Mono
.fromCallable(() -> personService.findByName(name)) // (2)
.subscribeOn(scheduler); // (3)
}
}
|
import io.micronaut.docs.ioc.beans.Person
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import jakarta.inject.Named
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
import java.util.concurrent.ExecutorService
@Controller("/subscribeOn/people")
class PersonController {
private final Scheduler scheduler
private final PersonService personService
PersonController(
@Named(TaskExecutors.IO) ExecutorService executorService, // (1)
PersonService personService) {
this.scheduler = Schedulers.fromExecutorService(executorService)
this.personService = personService
}
@Get("/{name}")
Mono<Person> byName(String name) {
return Mono
.fromCallable({ -> personService.findByName(name) }) // (2)
.subscribeOn(scheduler) // (3)
}
}
|
import io.micronaut.docs.ioc.beans.Person
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import java.util.concurrent.ExecutorService
import jakarta.inject.Named
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
@Controller("/subscribeOn/people")
class PersonController internal constructor(
@Named(TaskExecutors.IO) executorService: ExecutorService, // (1)
private val personService: PersonService) {
private val scheduler: Scheduler = Schedulers.fromExecutorService(executorService)
@Get("/{name}")
fun byName(name: String): Mono<Person> {
return Mono
.fromCallable { personService.findByName(name) } // (2)
.subscribeOn(scheduler) // (3)
}
}
|
配置的 I/O 執(zhí)行器服務(wù)被注入
Mono::fromCallable 方法包裝阻塞操作
Project Reactor 的 subscribeOn 方法調(diào)度 I/O 線程池上的操作
使用@Body 注解
要解析請(qǐng)求正文,您首先要使用 Body 注解向 Micronaut 指示哪個(gè)參數(shù)接收數(shù)據(jù)。
以下示例實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的回顯服務(wù)器,它回顯請(qǐng)求中發(fā)送的正文:
使用@Body 注解
Java |
Groovy |
Kotlin |
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import javax.validation.constraints.Size;
@Controller("/receive")
public class MessageController {
@Post(value = "/echo", consumes = MediaType.TEXT_PLAIN) // (1)
String echo(@Size(max = 1024) @Body String text) { // (2)
return text; // (3)
}
}
|
import io.micronaut.http.HttpResponse
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Post
import javax.validation.constraints.Size
@Controller("/receive")
class MessageController {
@Post(value = "/echo", consumes = MediaType.TEXT_PLAIN) // (1)
String echo(@Size(max = 1024) @Body String text) { // (2)
text // (3)
}
}
|
import io.micronaut.http.HttpResponse
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Post
import javax.validation.constraints.Size
@Controller("/receive")
open class MessageController {
@Post(value = "/echo", consumes = [MediaType.TEXT_PLAIN]) // (1)
open fun echo(@Size(max = 1024) @Body text: String): String { // (2)
return text // (3)
}
}
|
Post 注釋與 text/plain 的 MediaType 一起使用(默認(rèn)為 application/json)。
Body 注釋與 javax.validation.constraints.Size 一起使用,它將主體的大小限制為最多 1KB。此約束不限制服務(wù)器讀取/緩沖的數(shù)據(jù)量。
正文作為方法的結(jié)果返回
請(qǐng)注意,讀取請(qǐng)求正文是以非阻塞方式完成的,因?yàn)檎?qǐng)求內(nèi)容在數(shù)據(jù)可用時(shí)讀取并累積到傳遞給方法的字符串中。
配置文件(例如 application.yml)中的 micronaut.server.maxRequestSize 設(shè)置限制了服務(wù)器讀取/緩沖的數(shù)據(jù)大?。J(rèn)最大請(qǐng)求大小為 10MB)。 @Size 不能替代此設(shè)置。
無論限制如何,對(duì)于大量數(shù)據(jù),將數(shù)據(jù)累積到內(nèi)存中的 String 可能會(huì)導(dǎo)致服務(wù)器內(nèi)存緊張。更好的方法是在您的項(xiàng)目中包含一個(gè) Reactive 庫(例如 Reactor、RxJava 或 Akka),它支持 Reactive 流實(shí)現(xiàn)并流式傳輸它變得可用的數(shù)據(jù):
使用 Reactive Streams 讀取請(qǐng)求體
Java |
Groovy |
Kotlin |
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import javax.validation.constraints.Size;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import io.micronaut.core.async.annotation.SingleResult;
@Controller("/receive")
public class MessageController {
@Post(value = "/echo-publisher", consumes = MediaType.TEXT_PLAIN) // (1)
@SingleResult
Publisher<HttpResponse<String>> echoFlow(@Body Publisher<String> text) { //(2)
return Flux.from(text)
.collect(StringBuffer::new, StringBuffer::append) // (3)
.map(buffer -> HttpResponse.ok(buffer.toString()));
}
}
|
import io.micronaut.http.HttpResponse
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Post
import javax.validation.constraints.Size
import org.reactivestreams.Publisher
import io.micronaut.core.async.annotation.SingleResult
import reactor.core.publisher.Flux
@Controller("/receive")
class MessageController {
@Post(value = "/echo-publisher", consumes = MediaType.TEXT_PLAIN) // (1)
@SingleResult
Publisher<HttpResponse<String>> echoFlow(@Body Publisher<String> text) { // (2)
return Flux.from(text)
.collect({ x -> new StringBuffer() }, { StringBuffer sb, String s -> sb.append(s) }) // (3)
.map({ buffer -> HttpResponse.ok(buffer.toString()) });
}
}
|
import io.micronaut.http.HttpResponse
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Post
import javax.validation.constraints.Size
import org.reactivestreams.Publisher
import io.micronaut.core.async.annotation.SingleResult
import reactor.core.publisher.Flux
@Controller("/receive")
open class MessageController {
@Post(value = "/echo-publisher", consumes = [MediaType.TEXT_PLAIN]) // (1)
@SingleResult
open fun echoFlow(@Body text: Publisher<String>): Publisher<HttpResponse<String>> { //(2)
return Flux.from(text)
.collect({ StringBuffer() }, { obj, str -> obj.append(str) }) // (3)
.map { buffer -> HttpResponse.ok(buffer.toString()) }
}
}
|
在這種情況下,該方法被更改為接收和返回 Publisher 類型。
此示例使用 Project Reactor 并返回單個(gè)項(xiàng)目。因此,響應(yīng)類型也使用 SingleResult 進(jìn)行注釋。 Micronaut 只有在操作完成后才會(huì)發(fā)出響應(yīng)而不會(huì)阻塞。
collect 方法用于在這個(gè)模擬示例中累積數(shù)據(jù),但它可以將數(shù)據(jù)逐塊寫入日志服務(wù)、數(shù)據(jù)庫等
不需要轉(zhuǎn)換的類型的主體參數(shù)會(huì)導(dǎo)致 Micronaut 跳過請(qǐng)求的解碼!
Reactive Responses
上一節(jié)介紹了使用 Project Reactor 和 Micronaut 進(jìn)行響應(yīng)式編程的概念。
Micronaut 支持返回常見的反應(yīng)類型,例如 Mono(或來自 RxJava 的 Single Maybe Observable 類型)、來自任何控制器方法的 Publisher 或 CompletableFuture 的實(shí)例。
要使用 Project Reactor 的 Flux 或 Mono,您需要將 Micronaut Reactor 依賴項(xiàng)添加到您的項(xiàng)目以包含必要的轉(zhuǎn)換器。
要使用 RxJava 的 Flowable、Single 或 Maybe,您需要將 Micronaut RxJava 依賴項(xiàng)添加到您的項(xiàng)目以包含必要的轉(zhuǎn)換器。
使用 Body 注釋指定為請(qǐng)求主體的參數(shù)也可以是反應(yīng)類型或 CompletableFuture。
返回響應(yīng)式類型時(shí),Micronaut 在與請(qǐng)求相同的線程(Netty 事件循環(huán)線程)上訂閱返回的響應(yīng)式類型。因此,如果您執(zhí)行任何阻塞操作,請(qǐng)務(wù)必將這些操作卸載到適當(dāng)配置的線程池中,例如使用 Project Reactor 或 RxJava subscribeOn(..) 工具或@ExecuteOn。
總而言之,下表說明了一些常見的響應(yīng)類型及其處理:
表 1. Micronaut 響應(yīng)類型
類型 |
描述 |
示例簽名 |
Publisher
|
任何實(shí)現(xiàn) Publisher 接口的類型
|
Publisher<String> hello()
|
CompletableFuture
|
Java CompletableFuture 實(shí)例
|
CompletableFuture<String> hello()
|
HttpResponse
|
一個(gè) HttpResponse 和可選的響應(yīng)主體
|
HttpResponse<Publisher<String>> hello()
|
CharSequence
|
CharSequence 的任何實(shí)現(xiàn)
|
String hello()
|
T
|
任何簡(jiǎn)單的 POJO 類型
|
Book show()
|
當(dāng)返回 Reactive 類型時(shí),它的類型會(huì)影響返回的響應(yīng)。例如,當(dāng)返回一個(gè) Flux 時(shí),Micronaut 無法知道響應(yīng)的大小,因此使用 Chunked 的 Transfer-Encoding 類型。雖然對(duì)于發(fā)出單個(gè)結(jié)果的類型(例如 Mono),會(huì)填充 Content-Length 標(biāo)頭。
更多建議: