6.13 响应式 HTTP 请求处理
如前所述,Micronaut 是在 Netty 上构建的,Netty 是围绕事件循环模型和非阻塞 I/O 设计的。Micronaut 在与请求线程(事件循环线程)相同的线程中执行 @Controller bean 中定义的代码。
这使得如果你执行任何阻塞 I/O 操作(例如与 Hibernate/JPA 或 JDBC 的交互),将这些任务卸载到一个不阻塞事件循环的单独线程池中变得至关重要。
例如,以下配置将 I/O 线程池配置为具有 75 个线程的固定线程池(类似于 Tomcat 等传统阻塞服务器在每个请求线程模型中使用的线程):
配置 IO 线程池
micronaut:
executors:
io:
type: fixed
nThreads: 75
要在 @Controller bean 中使用这个线程池,你有许多选项。最简单的是使用 @ExecuteOn 注解,它可以在类型或方法级别声明,以指示在哪个配置的线程池上运行控制器的方法:
使用 @ExecuteOn
- Java
- Groovy
- Kotlin
import io.micronaut.docs.http.server.reactive.PersonService;
import io.micronaut.docs.ioc.beans.Person;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
@Controller("/executeOn/people")
public class PersonController {
private final PersonService personService;
PersonController(PersonService personService) {
this.personService = personService;
}
@Get("/{name}")
@ExecuteOn(TaskExecutors.IO) // (1)
Person byName(String name) {
return personService.findByName(name);
}
}
import io.micronaut.docs.http.server.reactive.PersonService
import io.micronaut.docs.ioc.beans.Person
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
@Controller("/executeOn/people")
class PersonController {
private final PersonService personService
PersonController(PersonService personService) {
this.personService = personService
}
@Get("/{name}")
@ExecuteOn(TaskExecutors.IO) // (1)
Person byName(String name) {
personService.findByName(name)
}
}
import io.micronaut.docs.http.server.reactive.PersonService
import io.micronaut.docs.ioc.beans.Person
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
@Controller("/executeOn/people")
class PersonController (private val personService: PersonService) {
@Get("/{name}")
@ExecuteOn(TaskExecutors.IO) // (1)
fun byName(name: String): Person {
return personService.findByName(name)
}
}
- @ExecuteOn 注解用于在 I/O 线程池上执行操作
@ExecuteOn 注解的值可以是在 micronat.executors
下定义的任何命名的执行器。
一般来说,对于数据库操作,你需要配置一个与数据库连接池中指定的最大连接数相匹配的线程池。
@ExecuteOn 注解的另一种选择是使用你选择的响应库提供的工具。Project Reactor 或 RxJava 等响应式实现具有 subscribeOn
方法,该方法允许你更改用哪个线程执行用户代码。例如:
响应式 subscribeOn 示例
- Java
- Groovy
- Kotlin
import io.micronaut.docs.ioc.beans.Person;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.scheduling.TaskExecutors;
import jakarta.inject.Named;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import io.micronaut.core.async.annotation.SingleResult;
import java.util.concurrent.ExecutorService;
@Controller("/subscribeOn/people")
public class PersonController {
private final Scheduler scheduler;
private final PersonService personService;
PersonController(
@Named(TaskExecutors.IO) ExecutorService executorService, // (1)
PersonService personService) {
this.scheduler = Schedulers.fromExecutorService(executorService);
this.personService = personService;
}
@Get("/{name}")
@SingleResult
Publisher<Person> byName(String name) {
return Mono
.fromCallable(() -> personService.findByName(name)) // (2)
.subscribeOn(scheduler); // (3)
}
}
import io.micronaut.docs.ioc.beans.Person
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import jakarta.inject.Named
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
import java.util.concurrent.ExecutorService
@Controller("/subscribeOn/people")
class PersonController {
private final Scheduler scheduler
private final PersonService personService
PersonController(
@Named(TaskExecutors.IO) ExecutorService executorService, // (1)
PersonService personService) {
this.scheduler = Schedulers.fromExecutorService(executorService)
this.personService = personService
}
@Get("/{name}")
Mono<Person> byName(String name) {
return Mono
.fromCallable({ -> personService.findByName(name) }) // (2)
.subscribeOn(scheduler) // (3)
}
}
import io.micronaut.docs.ioc.beans.Person
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import java.util.concurrent.ExecutorService
import jakarta.inject.Named
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
@Controller("/subscribeOn/people")
class PersonController internal constructor(
@Named(TaskExecutors.IO) executorService: ExecutorService, // (1)
private val personService: PersonService) {
private val scheduler: Scheduler = Schedulers.fromExecutorService(executorService)
@Get("/{name}")
fun byName(name: String): Mono<Person> {
return Mono
.fromCallable { personService.findByName(name) } // (2)
.subscribeOn(scheduler) // (3)
}
}
- 已注入配置的 I/O 执行器服务
Mono::fromCallable
方法包装阻塞操作- Project Reactor
subscribeOn
方法调度 I/O 线程池上的操作
6.13.1 使用 @Body 注解
要解析请求体,首先向 Micronaut 指示哪个参数接收带有 Body 注解的数据。
以下示例实现了一个简单的回显服务器,该服务器回显请求中发送的正文:
使用 @Body 注解
- Java
- Groovy
- Kotlin
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import javax.validation.constraints.Size;
@Controller("/receive")
public class MessageController {
@Post(value = "/echo", consumes = MediaType.TEXT_PLAIN) // (1)
String echo(@Size(max = 1024) @Body String text) { // (2)
return text; // (3)
}
}
import io.micronaut.http.HttpResponse
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Post
import javax.validation.constraints.Size
@Controller("/receive")
class MessageController {
@Post(value = "/echo", consumes = MediaType.TEXT_PLAIN) // (1)
String echo(@Size(max = 1024) @Body String text) { // (2)
text // (3)
}
}
import io.micronaut.http.HttpResponse
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Post
import javax.validation.constraints.Size
@Controller("/receive")
open class MessageController {
@Post(value = "/echo", consumes = [MediaType.TEXT_PLAIN]) // (1)
open fun echo(@Size(max = 1024) @Body text: String): String { // (2)
return text // (3)
}
}
- Post 注解与
text/plain
的 MediaType 一起使用(默认为application/json
)。 - Body 注解与
javax.validation.constraints.Size
一起使用,后者将请求体的大小限制为最多 1KB。此约束并不限制服务器读取/缓冲的数据量。 - 请求体作为方法的结果返回
请注意,读取请求体是以非阻塞的方式完成的,因为请求内容是在数据变得可用时读取的,并累积到传递给方法的 String 中。
application.yml
中的 micronaut.server.maxRequestSize
设置限制了服务器读取/缓冲的数据大小(默认最大请求大小为 10MB)@Size
不能替代此设置。
无论限制如何,对于大量数据,将数据累积到内存中的字符串中可能会导致服务器内存紧张。更好的方法是在你的项目中包括一个 Reactive 库(如Reactor
、RxJava
或 Akka
),该库支持响应流的实现并流式传输它可用的数据:
使用响应流读取请求体
- Java
- Groovy
- Kotlin
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import javax.validation.constraints.Size;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import io.micronaut.core.async.annotation.SingleResult;
@Controller("/receive")
public class MessageController {
@Post(value = "/echo-publisher", consumes = MediaType.TEXT_PLAIN) // (1)
@SingleResult
Publisher<HttpResponse<String>> echoFlow(@Body Publisher<String> text) { //(2)
return Flux.from(text)
.collect(StringBuffer::new, StringBuffer::append) // (3)
.map(buffer -> HttpResponse.ok(buffer.toString()));
}
}
import io.micronaut.http.HttpResponse
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Post
import javax.validation.constraints.Size
import org.reactivestreams.Publisher
import io.micronaut.core.async.annotation.SingleResult
import reactor.core.publisher.Flux
@Controller("/receive")
class MessageController {
@Post(value = "/echo-publisher", consumes = MediaType.TEXT_PLAIN) // (1)
@SingleResult
Publisher<HttpResponse<String>> echoFlow(@Body Publisher<String> text) { // (2)
return Flux.from(text)
.collect({ x -> new StringBuffer() }, { StringBuffer sb, String s -> sb.append(s) }) // (3)
.map({ buffer -> HttpResponse.ok(buffer.toString()) });
}
}
import io.micronaut.http.HttpResponse
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Post
import javax.validation.constraints.Size
import org.reactivestreams.Publisher
import io.micronaut.core.async.annotation.SingleResult
import reactor.core.publisher.Flux
@Controller("/receive")
open class MessageController {
@Post(value = "/echo-publisher", consumes = [MediaType.TEXT_PLAIN]) // (1)
@SingleResult
open fun echoFlow(@Body text: Publisher<String>): Publisher<HttpResponse<String>> { //(2)
return Flux.from(text)
.collect({ StringBuffer() }, { obj, str -> obj.append(str) }) // (3)
.map { buffer -> HttpResponse.ok(buffer.toString()) }
}
}
- 在这种情况下,方法被更改为接收并返回发布服务器类型。
- 此示例使用 Project Reactor 并返回单个项。因此,响应类型也使用 SingleResult 进行注解。Micronaut 仅在操作完成后才发出响应,而不会阻塞。
- 在这个模拟示例中,
collect
方法用于累积数据,但例如,它可以逐块将数据写入日志服务、数据库等
不需要转换的类型的 Body 参数会导致 Micronaut 跳过对请求的解码!
6.13.2 响应式(Reactive)响应
上一节介绍了使用 Project Reactor 和 Micronaut 进行响应式编程的概念。
Micronaut 支持从任何控制器方法返回常见的响应类型,如 Mono(或 RxJava 中的 Single Maybe Observable 类型)、Publisher 或 CompletableFuture 的实例。
要使用 Project Reactor 的 Flux
或 Mono
,你需要在项目中添加 Micronaut Reactor 依赖项,以包括必要的转换器。
要使用 RxJava 的 Flowable
、Single
或 Maybe
,你需要将 Micronaut RxJava 依赖项添加到你的项目中,以包括必要的转换器。
使用 Body 注解指定为请求主体的参数也可以是响应类型或 CompletableFuture。
当返回反应类型时,Micronaut 在与请求相同的线程(Netty Event Loop 线程)上订阅返回的反应类型。因此,重要的是,如果你执行任何阻塞操作,请将这些操作卸载到适当配置的线程池中,例如使用 Project Reactor 或 RxJava subscribeOn(..)
工具或 @ExecuteOn。
::note 提示 有关 Micronaut 设置的线程池以及如何配置线程池的信息,参阅配置线程池一节。 :::
总之,下表说明了一些常见的响应类型及其处理:
表 1. Micronaut 响应类型
类型 | 描述 | 示例签名 |
---|---|---|
Publisher | 实现 Publisher 接口的任意类型 | Publisher<String> hello() |
CompletableFuture | 一个 Java CompletableFuture 实体 | CompletableFuture<String> hello() |
HttpResponse | 一个 HttpResponse 及可选的响应体 | HttpResponse<Publisher<String>> hello() |
CharSequence | 任意 CharSequence 的实现 | String hello() |
T | 任意简单的 POJO 类型 | Book show() |