Micronaut 響應(yīng)式 HTTP 請(qǐng)求處理

2023-03-06 14:37 更新

如前所述,Micronaut 建立在 Netty 之上,Netty 是圍繞事件循環(huán)模型和非阻塞 I/O 設(shè)計(jì)的。 Micronaut 在與請(qǐng)求線程(事件循環(huán)線程)相同的線程中執(zhí)行在 @Controller bean 中定義的代碼。

這使得在執(zhí)行任何阻塞 I/O 操作(例如與 Hibernate/JPA 或 JDBC 的交互)時(shí)將這些任務(wù)卸載到不阻塞事件循環(huán)的單獨(dú)線程池變得至關(guān)重要。

例如,以下配置將 I/O 線程池配置為具有 75 個(gè)線程的固定線程池(類似于 Tomcat 等傳統(tǒng)阻塞服務(wù)器在每個(gè)請(qǐng)求線程模型中使用的線程池):

配置IO線程池

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.executors.io.type=fixed
micronaut.executors.io.nThreads=75
micronaut:
  executors:
    io:
      type: fixed
      nThreads: 75
[micronaut]
  [micronaut.executors]
    [micronaut.executors.io]
      type="fixed"
      nThreads=75
micronaut {
  executors {
    io {
      type = "fixed"
      nThreads = 75
    }
  }
}
{
  micronaut {
    executors {
      io {
        type = "fixed"
        nThreads = 75
      }
    }
  }
}
{
  "micronaut": {
    "executors": {
      "io": {
        "type": "fixed",
        "nThreads": 75
      }
    }
  }
}

要在 @Controller bean 中使用此線程池,您有多種選擇。最簡(jiǎn)單的是使用@ExecuteOn 注釋,它可以在類型或方法級(jí)別聲明,以指示在哪個(gè)配置的線程池上運(yùn)行控制器的方法:

使用@ExecuteOn

 Java Groovy  Kotlin 
import io.micronaut.docs.http.server.reactive.PersonService;
import io.micronaut.docs.ioc.beans.Person;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;

@Controller("/executeOn/people")
public class PersonController {

    private final PersonService personService;

    PersonController(PersonService personService) {
        this.personService = personService;
    }

    @Get("/{name}")
    @ExecuteOn(TaskExecutors.IO) // (1)
    Person byName(String name) {
        return personService.findByName(name);
    }
}
import io.micronaut.docs.http.server.reactive.PersonService
import io.micronaut.docs.ioc.beans.Person
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn

@Controller("/executeOn/people")
class PersonController {

    private final PersonService personService

    PersonController(PersonService personService) {
        this.personService = personService
    }

    @Get("/{name}")
    @ExecuteOn(TaskExecutors.IO) // (1)
    Person byName(String name) {
        personService.findByName(name)
    }
}
import io.micronaut.docs.http.server.reactive.PersonService
import io.micronaut.docs.ioc.beans.Person
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn

@Controller("/executeOn/people")
class PersonController (private val personService: PersonService) {

    @Get("/{name}")
    @ExecuteOn(TaskExecutors.IO) // (1)
    fun byName(name: String): Person {
        return personService.findByName(name)
    }
}
  1. @ExecuteOn 注解用于在I/O線程池上執(zhí)行操作

@ExecuteOn 注解的值可以是在 micronaut.executors 下定義的任何命名執(zhí)行器。

一般來說,對(duì)于數(shù)據(jù)庫操作,您需要配置一個(gè)與數(shù)據(jù)庫連接池中指定的最大連接數(shù)相匹配的線程池。

@ExecuteOn 注釋的替代方法是使用您選擇的響應(yīng)式庫提供的工具。諸如 Project Reactor 或 RxJava 之類的響應(yīng)式實(shí)現(xiàn)具有一個(gè) subscribeOn 方法,該方法可讓您更改執(zhí)行用戶代碼的線程。例如:

響應(yīng)式 subscribeOn 示例

 Java Groovy  Kotlin 
import io.micronaut.docs.ioc.beans.Person;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.scheduling.TaskExecutors;
import jakarta.inject.Named;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import io.micronaut.core.async.annotation.SingleResult;
import java.util.concurrent.ExecutorService;

@Controller("/subscribeOn/people")
public class PersonController {

    private final Scheduler scheduler;
    private final PersonService personService;

    PersonController(
            @Named(TaskExecutors.IO) ExecutorService executorService, // (1)
            PersonService personService) {
        this.scheduler = Schedulers.fromExecutorService(executorService);
        this.personService = personService;
    }

    @Get("/{name}")
    @SingleResult
    Publisher<Person> byName(String name) {
        return Mono
                .fromCallable(() -> personService.findByName(name)) // (2)
                .subscribeOn(scheduler); // (3)
    }
}
import io.micronaut.docs.ioc.beans.Person
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import jakarta.inject.Named
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
import java.util.concurrent.ExecutorService

@Controller("/subscribeOn/people")
class PersonController {

    private final Scheduler scheduler
    private final PersonService personService

    PersonController(
            @Named(TaskExecutors.IO) ExecutorService executorService, // (1)
            PersonService personService) {
        this.scheduler = Schedulers.fromExecutorService(executorService)
        this.personService = personService
    }

    @Get("/{name}")
    Mono<Person> byName(String name) {
        return Mono
                .fromCallable({ -> personService.findByName(name) }) // (2)
                .subscribeOn(scheduler) // (3)
    }
}
import io.micronaut.docs.ioc.beans.Person
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import java.util.concurrent.ExecutorService
import jakarta.inject.Named
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers


@Controller("/subscribeOn/people")
class PersonController internal constructor(
    @Named(TaskExecutors.IO) executorService: ExecutorService, // (1)
    private val personService: PersonService) {

    private val scheduler: Scheduler = Schedulers.fromExecutorService(executorService)

    @Get("/{name}")
    fun byName(name: String): Mono<Person> {
        return Mono
            .fromCallable { personService.findByName(name) } // (2)
            .subscribeOn(scheduler) // (3)
    }
}
  1. 配置的 I/O 執(zhí)行器服務(wù)被注入

  2. Mono::fromCallable 方法包裝阻塞操作

  3. Project Reactor 的 subscribeOn 方法調(diào)度 I/O 線程池上的操作

使用@Body 注解

要解析請(qǐng)求正文,您首先要使用 Body 注解向 Micronaut 指示哪個(gè)參數(shù)接收數(shù)據(jù)。

以下示例實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的回顯服務(wù)器,它回顯請(qǐng)求中發(fā)送的正文:

使用@Body 注解

 Java Groovy  Kotlin 
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import javax.validation.constraints.Size;

@Controller("/receive")
public class MessageController {

@Post(value = "/echo", consumes = MediaType.TEXT_PLAIN) // (1)
String echo(@Size(max = 1024) @Body String text) { // (2)
    return text; // (3)
}

}
import io.micronaut.http.HttpResponse
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Post
import javax.validation.constraints.Size

@Controller("/receive")
class MessageController {

@Post(value = "/echo", consumes = MediaType.TEXT_PLAIN) // (1)
String echo(@Size(max = 1024) @Body String text) { // (2)
    text // (3)
}

}
import io.micronaut.http.HttpResponse
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Post
import javax.validation.constraints.Size

@Controller("/receive")
open class MessageController {

@Post(value = "/echo", consumes = [MediaType.TEXT_PLAIN]) // (1)
open fun echo(@Size(max = 1024) @Body text: String): String { // (2)
    return text // (3)
}

}
  1. Post 注釋與 text/plain 的 MediaType 一起使用(默認(rèn)為 application/json)。

  2. Body 注釋與 javax.validation.constraints.Size 一起使用,它將主體的大小限制為最多 1KB。此約束不限制服務(wù)器讀取/緩沖的數(shù)據(jù)量。

  3. 正文作為方法的結(jié)果返回

請(qǐng)注意,讀取請(qǐng)求正文是以非阻塞方式完成的,因?yàn)檎?qǐng)求內(nèi)容在數(shù)據(jù)可用時(shí)讀取并累積到傳遞給方法的字符串中。

配置文件(例如 application.yml)中的 micronaut.server.maxRequestSize 設(shè)置限制了服務(wù)器讀取/緩沖的數(shù)據(jù)大?。J(rèn)最大請(qǐng)求大小為 10MB)。 @Size 不能替代此設(shè)置。

無論限制如何,對(duì)于大量數(shù)據(jù),將數(shù)據(jù)累積到內(nèi)存中的 String 可能會(huì)導(dǎo)致服務(wù)器內(nèi)存緊張。更好的方法是在您的項(xiàng)目中包含一個(gè) Reactive 庫(例如 Reactor、RxJava 或 Akka),它支持 Reactive 流實(shí)現(xiàn)并流式傳輸它變得可用的數(shù)據(jù):

使用 Reactive Streams 讀取請(qǐng)求體

 Java Groovy  Kotlin 
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import javax.validation.constraints.Size;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import io.micronaut.core.async.annotation.SingleResult;

@Controller("/receive")
public class MessageController {

@Post(value = "/echo-publisher", consumes = MediaType.TEXT_PLAIN) // (1)
@SingleResult
Publisher<HttpResponse<String>> echoFlow(@Body Publisher<String> text) { //(2)
    return Flux.from(text)
            .collect(StringBuffer::new, StringBuffer::append) // (3)
            .map(buffer -> HttpResponse.ok(buffer.toString()));
}

}
import io.micronaut.http.HttpResponse
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Post
import javax.validation.constraints.Size

import org.reactivestreams.Publisher
import io.micronaut.core.async.annotation.SingleResult
import reactor.core.publisher.Flux

@Controller("/receive")
class MessageController {

@Post(value = "/echo-publisher", consumes = MediaType.TEXT_PLAIN) // (1)
@SingleResult
Publisher<HttpResponse<String>> echoFlow(@Body Publisher<String> text) { // (2)
    return Flux.from(text)
            .collect({ x -> new StringBuffer() }, { StringBuffer sb, String s -> sb.append(s) }) // (3)
            .map({ buffer -> HttpResponse.ok(buffer.toString()) });
}

}
import io.micronaut.http.HttpResponse
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Post
import javax.validation.constraints.Size

import org.reactivestreams.Publisher
import io.micronaut.core.async.annotation.SingleResult
import reactor.core.publisher.Flux

@Controller("/receive")
open class MessageController {

@Post(value = "/echo-publisher", consumes = [MediaType.TEXT_PLAIN]) // (1)
@SingleResult
open fun echoFlow(@Body text: Publisher<String>): Publisher<HttpResponse<String>> { //(2)
    return Flux.from(text)
        .collect({ StringBuffer() }, { obj, str -> obj.append(str) }) // (3)
        .map { buffer -> HttpResponse.ok(buffer.toString()) }
}

}
  1. 在這種情況下,該方法被更改為接收和返回 Publisher 類型。

  2. 此示例使用 Project Reactor 并返回單個(gè)項(xiàng)目。因此,響應(yīng)類型也使用 SingleResult 進(jìn)行注釋。 Micronaut 只有在操作完成后才會(huì)發(fā)出響應(yīng)而不會(huì)阻塞。

  3. collect 方法用于在這個(gè)模擬示例中累積數(shù)據(jù),但它可以將數(shù)據(jù)逐塊寫入日志服務(wù)、數(shù)據(jù)庫等

不需要轉(zhuǎn)換的類型的主體參數(shù)會(huì)導(dǎo)致 Micronaut 跳過請(qǐng)求的解碼!

Reactive Responses

上一節(jié)介紹了使用 Project Reactor 和 Micronaut 進(jìn)行響應(yīng)式編程的概念。

Micronaut 支持返回常見的反應(yīng)類型,例如 Mono(或來自 RxJava 的 Single Maybe Observable 類型)、來自任何控制器方法的 Publisher 或 CompletableFuture 的實(shí)例。

要使用 Project Reactor 的 Flux 或 Mono,您需要將 Micronaut Reactor 依賴項(xiàng)添加到您的項(xiàng)目以包含必要的轉(zhuǎn)換器。

要使用 RxJava 的 Flowable、Single 或 Maybe,您需要將 Micronaut RxJava 依賴項(xiàng)添加到您的項(xiàng)目以包含必要的轉(zhuǎn)換器。

使用 Body 注釋指定為請(qǐng)求主體的參數(shù)也可以是反應(yīng)類型或 CompletableFuture。

返回響應(yīng)式類型時(shí),Micronaut 在與請(qǐng)求相同的線程(Netty 事件循環(huán)線程)上訂閱返回的響應(yīng)式類型。因此,如果您執(zhí)行任何阻塞操作,請(qǐng)務(wù)必將這些操作卸載到適當(dāng)配置的線程池中,例如使用 Project Reactor 或 RxJava subscribeOn(..) 工具或@ExecuteOn。

總而言之,下表說明了一些常見的響應(yīng)類型及其處理:

表 1. Micronaut 響應(yīng)類型
類型 描述 示例簽名

Publisher

任何實(shí)現(xiàn) Publisher 接口的類型

Publisher<String> hello()

CompletableFuture

Java CompletableFuture 實(shí)例

CompletableFuture<String> hello()

HttpResponse

一個(gè) HttpResponse 和可選的響應(yīng)主體

HttpResponse<Publisher<String>> hello()

CharSequence

CharSequence 的任何實(shí)現(xiàn)

String hello()

T

任何簡(jiǎn)單的 POJO 類型

Book show()

當(dāng)返回 Reactive 類型時(shí),它的類型會(huì)影響返回的響應(yīng)。例如,當(dāng)返回一個(gè) Flux 時(shí),Micronaut 無法知道響應(yīng)的大小,因此使用 Chunked 的 Transfer-Encoding 類型。雖然對(duì)于發(fā)出單個(gè)結(jié)果的類型(例如 Mono),會(huì)填充 Content-Length 標(biāo)頭。


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)