6.25 服务器发送事件
Micronaut 的 HTTP 服务器支持使用 Event API 发送服务器发送事件(SSE)。
为了从服务器上发送事件,返回一个响应式流 Publisher,它发送的是 Event 类型的对象。
Publisher 本身可以从后台任务、通过事件系统等发布事件。
举个例子,想象一下一个新闻头条的事件流;你可以定义一个数据类,如下:
Headline
- Java
- Groovy
- Kotlin
public class Headline {
private String title;
private String description;
public Headline() {}
public Headline(String title, String description) {
this.title = title;
this.description = description;
}
public String getTitle() {
return title;
}
public String getDescription() {
return description;
}
public void setTitle(String title) {
this.title = title;
}
public void setDescription(String description) {
this.description = description;
}
}
class Headline {
String title
String description
Headline() {}
Headline(String title, String description) {
this.title = title;
this.description = description;
}
}
class Headline {
var title: String? = null
var description: String? = null
constructor()
constructor(title: String, description: String) {
this.title = title
this.description = description
}
}
要发送新闻标题事件,编写一个控制器,使用你喜欢的任何响应式库返回一个 Event 实例的 Publisher。下面的例子通过 generate
方法使用 Project Reactor 的 Flux:
从控制器发布服务器发送事件
- Java
- Groovy
- Kotlin
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.sse.Event;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@Controller("/headlines")
public class HeadlineController {
@ExecuteOn(TaskExecutors.IO)
@Get(produces = MediaType.TEXT_EVENT_STREAM)
public Publisher<Event<Headline>> index() { // (1)
String[] versions = {"1.0", "2.0"}; // (2)
return Flux.generate(() -> 0, (i, emitter) -> { // (3)
if (i < versions.length) {
emitter.next( // (4)
Event.of(new Headline("Micronaut " + versions[i] + " Released", "Come and get it"))
);
} else {
emitter.complete(); // (5)
}
return ++i;
});
}
}
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.sse.Event
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
@Controller("/headlines")
class HeadlineController {
@ExecuteOn(TaskExecutors.IO)
@Get(produces = MediaType.TEXT_EVENT_STREAM)
Publisher<Event<Headline>> index() { // (1)
String[] versions = ["1.0", "2.0"] // (2)
Flux.generate(() -> 0, (i, emitter) -> {
if (i < versions.length) {
emitter.next( // (4)
Event.of(new Headline("Micronaut ${versions[i]} Released", "Come and get it"))
)
} else {
emitter.complete() // (5)
}
return i + 1
})
}
}
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.sse.Event
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
import java.util.concurrent.Callable
import java.util.function.BiFunction
@Controller("/headlines")
class HeadlineController {
@ExecuteOn(TaskExecutors.IO)
@Get(produces = [MediaType.TEXT_EVENT_STREAM])
fun index(): Publisher<Event<Headline>> { // (1)
val versions = arrayOf("1.0", "2.0") // (2)
return Flux.generate(
{ 0 },
BiFunction { i: Int, emitter: SynchronousSink<Event<Headline>> -> // (3)
if (i < versions.size) {
emitter.next( // (4)
Event.of(
Headline(
"Micronaut " + versions[i] + " Released", "Come and get it"
)
)
)
} else {
emitter.complete() // (5)
}
return@BiFunction i + 1
})
}
}
- 控制器方法返回一个 Event 的 Publisher
- 每个版本的 Micronaut 都会发出一个 jeadline
- Flux 类型的
generate
方法生成一个 Publisher。generate
方法接受一个初始值和一个接受该值和一个 Emitter 的lambda。请注意,这个例子是在与控制器动作相同的线程上执行的,但你可以使用subscribeOn
或映射一个现有的 “热” Flux。 - Emitter 接口的
onNext
方法发射的是 Event 类型的对象。Event.of(ET) 工厂方法构建了事件。 - Emitter 接口的
onComplete
方法指示何时完成发送服务器发送的事件。
提示
你通常想在一个单独的执行器上安排 SSE 事件流。前面的例子使用 @ExecuteOn 在 I/O 执行器上执行该流。
上面的例子发回了 text/event-stream
类型的响应,对于每一个发出的事件,之前的 Headline
类型将被转换为 JSON,导致响应,例如:
服务器发送事件响应输出
data: {"title":"Micronaut 1.0 Released","description":"Come and get it"}
data: {"title":"Micronaut 2.0 Released","description":"Come and get it"}
你可以使用 Event 接口的方法来定制发回的服务器发送事件数据,包括关联事件 id、评论、重试超时等。