事件监听与处理
Edit pageLast modified: 20 March 2025事件调度器 EventDispatcher
所有的事件处理器都需要注册到 EventDispatcher
中, 当 EventDispatcher
收到消息推送时, 它便会将这个事件交给其内部的处理器依次进行处理。
你可以在 Application
中获取到 EventDispatcher
:
val app = ...
val eventDispatcher = app.eventDispatcher // 获取事件调度器
var app = ...;
var eventDispatcher = app.getEventDispatcher(); // 获取事件调度器
注册事件处理器
想要处理事件, 你便需要向 EventDispatcher
注册事件处理器 EventListener
。
tip
有关事件处理器的具体描述会在后续小节介绍。
val eventDispatcher = ...
dispatcher.register { // this: EventListenerContext
// 处理事件...
EventResult.empty() // 返回一个处理结果
}
Kotlin 中提供了一些扩展函数, 可用来更便捷地注册一个事件处理器。
处理所有类型的事件:
val eventDispatcher = ...
dispatcher.process { // this: EventListenerContext
// 处理事件, 但是不需要返回值
}
处理指定类型的事件:
dispatcher.listen<Event> { // this: EventListenerContext
// 处理事件
EventResult.empty()
}
dispatcher.process<Event> { // this: EventListenerContext
// 处理事件, 但是不需要返回值
}
Kotlin 中也提供了直接针对 Application
的便利扩展 listener
:
val app = ...
app.listeners { // this: EventListenerRegistrar
listen<Event> { ... }
process { ... }
process<Event> { ... }
}
处理所有类型的事件:
var eventDispatcher = ...;
dispatcher.register(EventListeners.async(context -> {
// 处理事件...
return CompletableFuture.completedFuture(EventResult.empty()); // 返回处理结果
}));
处理特定类型的事件:
var eventDispatcher = ...;
dispatcher.register(EventListeners.async(Event.class, (context, event) -> {
// 处理事件...
return CompletableFuture.completedFuture(EventResult.empty()); // 返回处理结果
}));
事件处理器注册回执
当通过 register
注册一个事件处理器后, 会返回一个 EventListenerRegistrationHandle
。 可以将它称之为注册的回执, 或者句柄, 总之它的作用是通过调用 dispose
来“取消”这次注册的事件处理器。 取消注册后, 对应的事件处理器将不再有效。
val handle = register(...)
handle.dispose()
var handle = register(...);
handle.dispose(); // 取消注册
注册事件处理器的额外属性
注册事件处理器时, 可以提供一些额外的属性与配置信息, 例如优先级等。
val dispatcher = ...
dispatcher.register({ // this: EventListenerRegistrationProperties
// 配置一些信息
// 优先级
priority = -1 // 通常默认值为 0, 越小越优先
// 添加一个要注册的事件处理器的专属拦截器
addInterceptor {
// 拦截...
invoke() // 执行拦截链的后续并返回真实结果
}
}) {
// 处理事件, 返回处理结果...
...
}
那些扩展函数也同样支持:
dispatcher.listen<Event>(propertiesConsumer = {
// 配置一些信息...
}) {
// 处理事件, 返回处理结果...
...
}
dispatcher.process<Event>(propertiesConsumer = {
// 配置一些信息...
}) {
// 处理事件...
...
}
dispatcher.process(propertiesConsumer = {
// 配置一些信息...
}) {
// 处理事件...
...
}
var dispatcher = ...;
dispatcher.register(
(properties) -> {
// 配置一些信息...
// 优先级
properties.setPriority(-1);
// 添加为当前注册的事件处理器的专属拦截器
properties.addInterceptor(EventInterceptors.async(context -> {
// 拦截...
return context.invoke();
}));
},
// 事件处理器...
EventListeners.async(context -> {
// 处理事件...
return CompletableFuture.completedFuture(EventResult.empty()); // 返回处理结果
}));
事件处理器 EventListener
事件处理器 EventListener
是一个函数接口类型, 它定义了处理事件的方式:接收一个 EventListenerContext
, 然后经过处理, 返回 EventResult
。
tip
在 Java 中, 你可以通过 `EventListeners` 的静态工厂API来通过 Java 友好的 lambda 方式构建阻塞或异步风格的实例。
var eventListener = EventListeners.async(context -> { // 处理事件... return CompletableFuture.completedFuture(EventResult.empty()); // 返回处理结果 });
var eventListener = EventListeners.async(Event.class, (context, event) -> { // 处理事件... return CompletableFuture.completedFuture(EventResult.empty()); // 返回处理结果 });
事件处理结果 EventResult
EventResult
是 EventListener
中处理完事件后的返回值类型。
它主要提供了如下信息:
大多数情况下你不需要自己实现 EventResult
。核心库中提供了 StandardEventResult
定义了部分常见、或具有特殊含义的类型, 并且在 EventResult
中提供静态API来直接获取它们。
Invalid
代表无效的结果。通常用于那些在类型匹配(比如 dispatcher.listen<Event> {...}
)等地方使用, 当某些有效性判断不匹配时返回, 代表本次处理无效, 没有进入真正的逻辑。
可以通过 EventResult.invalid()
获取。
Error
代表出现错误的结果。当出现了异常(例如由 EventDispatcher
捕获到的事件处理器中抛出的异常)时便会将异常包装到 Error
中, 然后返回给事件推送者。
tip
有关事件推送的内容会在下文介绍。
虽然也可以通过 EventResult.error(...)
人为构建, 但是通常交给调度器捕获即可, 人为构建的场景并不多。
Empty
代表没有结果的结果。与 Invalid
有些类似, 但是含义不同。 Empty
代表事件处理的逻辑正常执行了, 只是没什么可返回的内容。 还有一点不同是, Empty
可以指定 isTruncated
, 而 Invalid
不行。
可以通过 EventResult.empty(...)
或在 EventResult.of(...)
的 content 参数为 null
时获取。
Simple
除去上面这些类型以外, 处理逻辑正常执行, 且有需要返回的 content
时的类型。 可以通过 EventResult.of(...)
的 content 参数不为 null
时获取。
Simple
也具有特殊效果, 它实现了 CollectableReactivelyResult
。
CollectableReactivelyResult
一个特殊的标记类型, 如果返回值结果是 CollectableReactivelyResult
, 则事件调度器应当对 content
进行一次类型判断, 当返回值类型是符合条件的 异步/响应式结果, 则对其进行一次挂起收集。
例如返回的 content
为 Mono
类型(JVM) 或 Deferred
类型(Kotlin协程库自带), 那么就会使用 await
挂起并等待一次它们的结果。 这个过程不会阻塞线程, 也符合事件处理器处理事件时的依次处理的形式, 其主要服务JVM平台下的Java用户, 以允许使用更加丰富的响应式库来实现异步的事件处理。
java.util.concurrent.CompletionStage
(java.util.concurrent.CompletableFuture
) (JVM)kotlinx.coroutines.Deferred
(不支持kotlinx.coroutines.Job
)kotlinx.coroutines.flow.Flow
(会被收集为List
)kotlin.js.Promise
(JS)org.reactivestreams.Publisher
(JVM) (不是reactor.core.publisher.Mono
或reactor.core.publisher.Flux
时: 会被收集为List
)reactor.core.publisher.Flux
(JVM) (会被收集为List
)reactor.core.publisher.Mono
(JVM)io.reactivex.CompletableSource
(JVM) (会挂起, 但是结果始终为null
)io.reactivex.SingleSource
(JVM)io.reactivex.MaybeSource
(JVM)io.reactivex.ObservableSource
(JVM) (会被收集为List
)io.reactivex.Flowable
(JVM) (会被收集为List
)io.reactivex.rxjava3.core.CompletableSource
(JVM)io.reactivex.rxjava3.core.SingleSource
(JVM)io.reactivex.rxjava3.core.MaybeSource
(JVM)io.reactivex.rxjava3.core.ObservableSource
(JVM) (会被收集为List
)io.reactivex.rxjava3.core.Flowable
(JVM) (会被收集为List
)
warning
运行时库需要注意的是, 除了 Kotlin 自带的那些类型, 其他大部分响应式结果都需要额外的运行时库。 参考: kotlinx-coroutines-reactive。
事件处理器上下文 EventListenerContext
每一个事件处理器在处理事件时, 都会被提供一个 EventListenerContext
。
事件上下文 EventContext
一个在事件处理流程中流转的上下文。 用于承载本次事件处理前后的诸项信息。
note
EventContext
实现了CoroutineScope
, 因此它可以作为一个协程作用域使用。它的协程上下文通常与EventDispatcher
有关, 但是不含有Job
。
注解 API
simbot4 定义了一套注解API ,用来支持使用注解快速实现事件处理逻辑。 这套注解API主要由 Spring Boot starter
进行实现。
class MyListeners {
@Listener // 注解API,标记一个函数为事件处理函数
suspend fun handle(event: Event) {
// ...
}
}
public class MyListeners {
@Listener // 注解API,标记一个函数为事件处理函数
public void handle(Event event) {
// ...
}
}
它被命名为 quantcat(量子猫)
, 你可以前往文章 量子猫 来了解更多。
事件推送
想要使事件处理器工作, 那么就要推送事件。可以通过 EventDispatcher.push
推送一个事件, 并得到一个处理结果流 Flow<EventResult>
。
note
事件推送的任务通常是你安装的插件或者注册的 Bot 来完成的。如果只是普通的 Bot 应用开发者, 则大部分情况下不需要自行推送事件。
val resultFlow = dispatcher.push(event)
// 事件推送的结果流是冷流,
// 你必须收集这个流, 事件才会真正开始被处理
resultFlow.collect()
在 Java 中直接操作 Flow
可能会有一些困难, 你可以使用 EventProcessors
中提供的各种静态API来进行事件推送。
// 推送事件、并在异步中收集结果。
// 其中第三个参数 `application` 是用于执行异步任务的 `CoroutineScope` 类型,
// 你可以使用 `Application` (继承了 `CoroutineScope`),
// 或者使用 Kotlin 的 `GlobalScope` (有关它的注意事项参见其文档注释)
var asyncListFuture = EventProcessors.pushAndCollectToAsync(dispatcher, event, application, new ArrayList<>());
// 如果希望收集为 List, 也有简写形式
var otherAsyncListFuture = EventProcessors.pushAndCollectToListAsync(dispatcher, event, application);
// 也可以使用 Java 中的 Collector, 就像在 Stream 中使用它一样
var asyncCollectListFuture = EventProcessors.pushAndCollectToAsync(dispatcher, event, application, Collectors.toList());
// 借助 Collector, 也可以实现一些复杂的结果收集, 比如:
// -> 只过滤出类型为 StandardEventResult.Simple 的结果
var asyncCollectSimpleListFuture = EventProcessors.pushAndCollectToAsync(
dispatcher, event, application,
Collectors.filtering(
result -> result instanceof StandardEventResult.Simple,
Collectors.toList()
));
// -> 根据是否为 StandardEventResult.Error 为依据分组, 并计算每组的结果数量
var asyncTypeCountingFuture = EventProcessors.pushAndCollectToAsync(
dispatcher, event, application,
Collectors.partitioningBy(
result -> result instanceof StandardEventResult.Error,
Collectors.counting()
));
你也可以将结果转化为响应式的 reactor.core.publisher.Flux
。
var resultFlux = EventProcessors.pushAndAsFlux(dispatcher, event);
resultFlux.subscribe(result -> {
// 事件推送的结果流是冷流, 因此转化后的 `Flux` 也是冷的,
// 你必须消费其中的元素才能使得事件真正的被处理。
});
warning
使用
pushAndAsFlux
需要项目运行时环境中存在 kotlinx-coroutines-reactor。
如上面的示例中所说, 通过 push
得到的 Flow
结果是一个冷流, 因此你必须收集、消费其中的元素, 它才会使得事件被处理, 并且这也受到你对这个流的中间操作。
举个例子:
val flow = dispatcher.push(event)
flow.take(2) // 取前两个元素
.collect {
// 消费结果
}
var flux = EventProcessors.pushAndAsFlux(dispatcher, event);
flux.take(2) // 取前两个结果
.subscribe(result -> {
// 消费结果
});
在示例中, 我们在获取到结果流时只取了前两个结果。此时, 在消费结果时真正处理了事件的事件处理器也只有前两个, 因为我们得到的流是冷流, 每一次结果都是一次“实时”的事件处理。
你可以依据这些类型的特性来自由决定结果流的处理形式, 例如在收集前指定调度器, 这时则可能在收集前便已经有事件处理器开始处理事件了。