Simple Robot v4.7.0 Help

实现事件推送

当你有了事件,那么就将其发射。

事件处理器

在simbot标准API中定义了一个用于处理事件的事件处理器 EventProcessor, 它的作用是由 Application 提供给外部(例如组件),使其拥有推送事件的能力。

我们首先来简单看一下 EventProcessor 的简化版定义:

public interface EventProcessor { /** * 处理事件,得到结果流 */ public fun push(event: Event): Flow<EventResult> }

可以看到, EventProcessor 的主要作用就是:通过 push 推送一个事件 Event, 然后得到所有事件监听函数(EventListener )的处理结果流 Flow<EventResult>

获取并保存

在介绍用法之前,先讲一下如何获取 EventProcessor

Plugin / BotManager 内获取

EventProcessor 在构建阶段中,从开始构建 Plugin 这个阶段开始可以被获取到。 其中, BotManager 的构建就属于这个阶段。

当你的 PluginBotManager 实现需要使用到事件推送的能力时, 你可以在注册它们的时候从 PluginConfigureContext 获取到 EventDispatcher

以一个 BotManager 为例:

class FooBotManager( val eventProcessor: EventProcessor, val config: FooBotManagerConfiguration ) : BotManager { // ... 其他实现细节省略 ... /** 伴生对象实现工厂 */ companion object Factory : BotManagerFactory<FooBotManager, FooBotManagerConfiguration> { override val key: BotManagerFactory.Key = object : BotManagerFactory.Key {} override fun create(context: PluginConfigureContext, configurer: ConfigurerFunction<FooBotManagerConfiguration>): FooBotManager { val config = FooBotManagerConfiguration().invokeBy(configurer) // 得到 EventDispatcher, 接下来就可以保留它,然后到需要的时候使用它了。 val eventDispatcher = context.eventDispatcher // ... return FooBotManager(eventDispatcher, config) } } }

外部获取

Application 被构建完成后,你可以在直接在 Application 中获取到 EventDispatcher

// 配置、构建、启动 Application val app = launchSimpleApplication { ... } // 得到 EventDispatcher val eventDispatcher = app.eventDispatcher // 使用它。 eventDispatcher.push(...)

使用

事件推送

一个最简单的使用例子:

// 推送事件,并收集结果。 eventProcessor.push(event).collect()

在 Java 中,我们为 EventProcessor 提供了一些异步友好的兼容API。

首先, push 函数本身并非挂起函数,因此你可以直接调用它并得到 Flow 的结果。

但是Java中想要直接处理 Flow 就有些困难了。 举个例子,你可以借助 kotlinx-coroutines-reactorFlow 转为 Flux

var flow = eventProcessor.push(event); var flux = FluxKt.flux( EmptyCoroutineContext.INSTANCE, flow ); // 异步地收集结果并忽略结果 flux.subscribe();

或者直接丢给一个作用域 CoroutineScope 在异步中执行:

eventProcessor.pushAndLaunch( GlobalScope.INSTANCE, event );

除了上面直接借助 EventProcessor 自身的API以外, 我们还使用 EventProcessors 提供若干静态API来辅助使用。

// 推送事件并转化为 Flux // 需要确保项目环境中存在 [[[kotlinx-coroutines-reactor|https://github.com/Kotlin/kotlinx.coroutines/tree/master/reactive]]] 的相关依赖 EventProcessors.pushAndAsFlux( eventProcessor, event ).subscribe(); // 推送事件并将结果异步地收集为 List var resultList = EventProcessors.pushAndCollectToListAsync( eventProcessor, event, GlobalScope.INSTANCE );

调度器

当执行 EventProcessor.push 时,其内部执行事件处理逻辑时的调度器是在构建 Application 的时候被决定的

val app = launchApplication(...) { // 配置 eventDispatcher eventDispatcher { coroutineContext = Dispatchers.IO } // 其他配置... }

因此,通过 push 得到的结果无法影响上游的调度器。例如,以上述代码为基准,再追加以下代码:

app.eventDispatcher // 推送事件 .push(event) // 指定一个调度器 .flowOn(MyCustomDispatcher) // 收集结果 .collect()

那么,用于处理事件的监听函数会在 Dispatcher.IO 中被异步调度, 而在最后的 collect 时会被 MyCustomDispatcher 调度。

冷流与同步

由上可知,虽然默认的事件调度器的实现是冷流 ,即你不去收集则不会真正地执行事件处理, 但是因为可以配置调度器,因此它并非是一条一条地同步处理的。

考虑如下这组代码:

val app = launchApplication(...) { eventDispatcher { // 默认配置,即无自定义调度器 } } app.eventDispatcher .push(event) .take(3) .collect()
val app = launchApplication(...) { eventDispatcher { coroutineContext = Dispatchers.IO } } app.eventDispatcher .push(event) .take(3) .collect()

假设其中一共注册了5个事件的监听函数,每个函数都会输出一段控制台文本,那么二者的输出则有可能是:

第1个处理器:处理事件 第2个处理器:处理事件 第3个处理器:处理事件
第1个处理器:处理事件 第2个处理器:处理事件 第3个处理器:处理事件 第4个处理器:处理事件 第5个处理器:处理事件

因此,虽然 push 后使用了 take(3) 限制了条数, 但是它无法保证能够控制真实的事件处理次数。

结果筛选

作为一个插件,通常情况下我们需要对返回的 EventResult 结果流做一些处理。 一个最常见的场景:我们要将返回了错误的结果以日志的形式输出。

EventResult 包括了所有执行了的事件监听函数的执行结果,包括错误的结果。 在标准API中,我们约定如果类型为 StandardEventResult.Error 则将其视为一个执行失败的结果 (内部的异常处理也会将出现的错误包装为此),因此你可以在出现此类型时做一些处理:

eventProcessor .push(event) // 如果出现异常结果,处理它 .onEach { result -> if (result is StandardEventResult.Error) { // 比如:输出日志 logger.error( "Result is error: {}", result.content.message, result.content, // Throwable ) } } .collect()

我们提供了一些简化操作的API。例如,你可以使用 onEachError { ... } 来改写上面的代码:

eventProcessor .push(event) // 如果出现异常结果,处理它 .onEachError { result -> // 比如:输出日志 logger.error( "Result is error: {}", result.content.message, result.content, // Throwable ) } .collect()
Last modified: 15 November 2024