diff --git a/events/events.go b/events/events.go index 1113cad3ab..4f910ab454 100644 --- a/events/events.go +++ b/events/events.go @@ -41,8 +41,8 @@ var _ Channel[int] = (*simpleChannel[int])(nil) // // At the moment this will always return a new simpleChannel, however that may change in // the future as this feature gets fleshed out. -func New[T any](subscriberBufferSize int, eventBufferSize int) Channel[T] { - return NewSimpleChannel[T](subscriberBufferSize, eventBufferSize) +func New[T any](commandBufferSize int, eventBufferSize int) Channel[T] { + return NewSimpleChannel[T](commandBufferSize, eventBufferSize) } // Events hold the supported event types diff --git a/events/simple.go b/events/simple.go index 59777a857f..bf247a7a16 100644 --- a/events/simple.go +++ b/events/simple.go @@ -11,26 +11,39 @@ package events type simpleChannel[T any] struct { - subscribers []chan T - subscriptionChannel chan chan T - unsubscribeChannel chan chan T - eventChannel chan T - eventBufferSize int - closeChannel chan struct{} - isClosed bool + subscribers []chan T + // commandChannel manages all commands sent to this simpleChannel. + // + // It is important that all stuff gets sent through this single channel to ensure + // that the order of operations is preserved. + // + // WARNING: This does mean that non-event commands can block the database if the buffer + // size is breached (e.g. if many subscribe commands occupy the buffer). + commandChannel chan any + eventBufferSize int + hasClosedChan chan struct{} + isClosed bool } -// NewSimpleChannel creates a new simpleChannel with the given subscriberBufferSize and +type subscribeCommand[T any] Subscription[T] + +type unsubscribeCommand[T any] Subscription[T] + +type publishCommand[T any] struct { + item T +} + +type closeCommand struct{} + +// NewSimpleChannel creates a new simpleChannel with the given commandBufferSize and // eventBufferSize. // // Should the buffers be filled subsequent calls to functions on this object may start to block. -func NewSimpleChannel[T any](subscriberBufferSize int, eventBufferSize int) Channel[T] { +func NewSimpleChannel[T any](commandBufferSize int, eventBufferSize int) Channel[T] { c := simpleChannel[T]{ - subscriptionChannel: make(chan chan T, subscriberBufferSize), - unsubscribeChannel: make(chan chan T, subscriberBufferSize), - eventChannel: make(chan T, eventBufferSize), - eventBufferSize: eventBufferSize, - closeChannel: make(chan struct{}), + commandChannel: make(chan any, commandBufferSize), + hasClosedChan: make(chan struct{}), + eventBufferSize: eventBufferSize, } go c.handleChannel() @@ -46,7 +59,7 @@ func (c *simpleChannel[T]) Subscribe() (Subscription[T], error) { // It is important to set this buffer size too, else we may end up blocked in the handleChannel func ch := make(chan T, c.eventBufferSize) - c.subscriptionChannel <- ch + c.commandChannel <- subscribeCommand[T](ch) return ch, nil } @@ -54,14 +67,14 @@ func (c *simpleChannel[T]) Unsubscribe(ch Subscription[T]) { if c.isClosed { return } - c.unsubscribeChannel <- ch + c.commandChannel <- unsubscribeCommand[T](ch) } func (c *simpleChannel[T]) Publish(item T) { if c.isClosed { return } - c.eventChannel <- item + c.commandChannel <- publishCommand[T]{item} } func (c *simpleChannel[T]) Close() { @@ -69,27 +82,31 @@ func (c *simpleChannel[T]) Close() { return } c.isClosed = true - c.closeChannel <- struct{}{} + c.commandChannel <- closeCommand{} + + // Wait for the close command to be handled, in order, before returning + <-c.hasClosedChan } func (c *simpleChannel[T]) handleChannel() { - for { - select { - case <-c.closeChannel: - close(c.closeChannel) + for cmd := range c.commandChannel { + switch command := cmd.(type) { + case closeCommand: for _, subscriber := range c.subscribers { close(subscriber) } - close(c.subscriptionChannel) - close(c.unsubscribeChannel) - close(c.eventChannel) + close(c.commandChannel) + close(c.hasClosedChan) return - case ch := <-c.unsubscribeChannel: + case subscribeCommand[T]: + c.subscribers = append(c.subscribers, command) + + case unsubscribeCommand[T]: var isFound bool var index int for i, subscriber := range c.subscribers { - if ch == subscriber { + if command == subscriber { index = i isFound = true break @@ -103,14 +120,11 @@ func (c *simpleChannel[T]) handleChannel() { c.subscribers[index] = c.subscribers[len(c.subscribers)-1] c.subscribers = c.subscribers[:len(c.subscribers)-1] - close(ch) - - case newSubscriber := <-c.subscriptionChannel: - c.subscribers = append(c.subscribers, newSubscriber) + close(command) - case item := <-c.eventChannel: + case publishCommand[T]: for _, subscriber := range c.subscribers { - subscriber <- item + subscriber <- command.item } } }