集合代理
集合代理的角色是促成 Garnet 中集合上的阻塞命令。
阻塞命令是一种命令,如果集合中的项可用则立即返回,或者阻塞客户端直到项可用(参见:BLPOP、BLMOVE 等)。
当有活跃的客户端正在等待集合项时,代理会在一个专用任务上运行其主循环。
逻辑流程
传入的阻塞命令:
- 客户端发送一个阻塞命令,命令处理程序反过来调用
CollectionItemBroker.GetCollectionItemAsync
- 如果代理的主循环没有运行,它将开始运行并等待其事件队列(
brokerEventsQueue
)中的下一个事件。 - 创建一个新的
CollectionItemObserver
对象,并将一个类型为NewObserverEvent
的事件推送到事件队列中。 - 命令处理程序等待以下两种情况之一
- 来自主循环的信号量信号,通知已找到项。
- 客户端指定的超时已达到。
- 如果代理的主循环没有运行,它将开始运行并等待其事件队列(
传入的“释放”命令:
- 客户端将一个项插入到集合中,命令处理程序反过来调用
CollectionItemBroker.HandleCollectionUpdate
\- 如果集合没有被任何客户端观察,则无需执行任何操作。
- 否则,将一个类型为
CollectionUpdatedEvent
的新事件推送到事件队列中。
主代理循环:
- 主循环(
CollectionItemBroker.Start
)持续监听AsyncQueue
以获取新的传入事件,并为每个新事件同步调用HandleBrokerEvent
。\- 对于类型为
NewObserverEvent
的事件,将调用InitializeObserver
。InitializeObserver
接收一个键数组,并按照客户端指定的顺序检查集合值以查找可用项。如果找到项,则更新观察者,这将设置结果并发出信号量以释放等待的线程。如果没有找到项,则将观察者添加到每个键的观察者队列中。 - 对于类型为
CollectionUpdatedEvent
的事件,将调用TryAssignItemFromKey
。此方法获取键的观察者队列,并尝试将存储在键处的集合中的下一个可用项分配给下一个观察者。如果确实找到了可用项,则更新观察者,这将设置结果并发出信号量以释放等待的线程。
- 对于类型为
已处置的会话:
- 如果一个具有活动观察者的
RespServerSession
被处置,其Dispose
方法将调用CollectionItemBroker.HandleSessionDisposed
,后者又将更新观察者并发出信号量等待器以停止。一旦观察者的状态更改为SessionDisposed
,主循环将不再为其分配任何项。