- Published on
Real Time Queue Service - tech post
- Authors
- Name
- Selina Zheng
Motivations and objectives
After a frustrating experience trying to get tickets to Tomorrowland and failing, I was inspired to create a real-time ticketing and queuing service that informs the users their current position in the queue. It would allow users to join the queue and subscribe to updates of their real-time positions, and allow workers or operators to fetch the next user from the queue, while notifying users still in the queue of position changes.
The service utilises fs2 and cats effect and implements a streaming endpoint for the user to receive updates and a simple GET endpoint for the worker. The bare-minimum UI is built with NextJS.
Accompanying codebase
The code discussed in this post can be found in the github repository.
Product features
- Users enqueue
- Workers dequeue
- Users receive real-time position updates
Architecture
The service stores in memory the current state of the queue:
- Queue of
UserPosition
, each user's useSessionId and its assignedPosition - Ref of
AssignedPosition
, a counter for the last assigned position - Signal of
LatestServedPosition
, to keep track of the assigned position of the last served user
In-memory state is managed by QueueService:
trait QueueService[F[_]] {
def addUser(userSessionId: UserSessionId): F[UserPosition]
def nextUser: F[Option[UserPosition]]
def subscribeToUpdates(assignedPosition: Int): Stream[F, Int]
}
class QueueServiceInMemoryImpl[F[_] : Monad](userQueue: Queue[F, UserPosition],
assignedPositionCounter: Ref[F, Int],
latestServicedPositionSignal: SignallingRef[F, Int])
extends QueueService[F]
Users enqueue and receive a stream of position updates via UserService
. As a user calls the /add-user-and-subscribe
endpoint, the service queries Ref[AssignedPosition]
to increment the latest assignedPosition and get the next assignedPosition, enqueues the user with their userSessionId and assignedPosition to Queue[UserPosition]
, and subscribes to Signal[LatestServedPosition]
. Their effectual position is their assignedPosition deducted by the latest position being served. The stream ends when the user is being served and their effectual position is 0.
trait UserService[F[_]] {
def addUserAndSubscribe(userSessionId: UserSessionId): Stream[F, PositionUpdate]
}
Entry point for workers to dequeue and fetch the next user is WorkerService
. When the queue operator calls the /get-next-user
endpoint, the service dequeues from Queue[UserPosition]
, and unless the queue is empty, updates the Signal[LatestServedPosition]
to the assignedPosition of the dequeued user and returns the user info to the operator. The Signal propagates the change to the user who then receives an update via the stream.
trait WorkerService[F[_]] {
def getNextUser: F[Option[UserPosition]]
}
Implementation
fs2 SignallingRef - last update wins
SignallingRef
extending Signal
from the fs2 concurrent library holds a single value that can be updated and read in the effect F
. Changes are propagated via the method discrete
returning a stream of updates in the value. This is the most efficient for the purpose of updating user positions because of last-update-wins policy in case of very fast updates so that users directly receives the latest update. Instead of each user subscribing to a Topic
for every update to their own effectual position, the single Signal
propagates the latest updates to all users at once.
The Signal
emits a stream of position updates to the user until their assigned position, as in until the user is served:
override def subscribeToUpdates(assignedPosition: Int): Stream[F, Int] =
latestServicedPositionSignal.discrete.takeThrough(_ < assignedPosition)
Cats effect Ref - thread-safe concurrency
Ref
from the cats effect library is the go-to for thread-safe and concurrent mutable reference. However, it does NOT synchronise read and write. Imagine the scenario where two users are enqueueing concurrenctly. Given the commented out code where get
and update
methods are called separately, when one user is reading the Ref
of assignedPostions via get
method, the other user has set their position with the same assignedPosition and is incrementing the Ref
by 1 via update
method. Then these two users would end up with the same assignedPosition.
To avoid this race condition, we use updateAndGet
method, an atomic operation providing thread-safe and concurrent reading, writing and returning of the result:
override def addUser(userSessionId: UserSessionId): F[UserPosition] =
for {
//assignedPosition <- assignedPositionCounter.get
assignedPosition <- assignedPositionCounter.updateAndGet(_ + 1)
userPosition = UserPosition(userSessionId, assignedPosition)
_ <- userQueue.offer(userPosition)
//_ <- assignedPositionCounter.update(_ + 1)
} yield userPosition
GRPC - server streaming endpoint
A server-streaming RPC is fit for the user endpoint as the server returns a stream of messages in response to a client's request. The user sends their sessionId once from the browser and receives a stream of positionUpdates from the server.
Service definition for the user streaming ednpoint:
service RealTimeQueueUserService {
rpc AddUserAndSubscribe (Request) returns (stream PositionUpdate) {}
}
Tapir - websocket
However, for simpler integration with a NextJS frontend app, a websocket endpoint is more fit for real-time communication and easier setup with javascript. The Tapir library is used to describe and handle the websocket endpoint. The user sends userSessionId
as a query parameter and does not emit further messages to the server. The server emits a stream of Option[PositionUpdate]
to the client and closes frame by sending a None
message, signalling the end of the stream.
User websocket endpoint description:
val userServiceEndpoint: Endpoint[Unit,
String,
(StatusCode, ErrorResponse),
Pipe[F, Option[String], Option[PositionUpdate]],
Fs2Streams[F] & capabilities.WebSockets] =
endpoint.get
.in("add-user-and-subscribe")
.in(query[String]("userSessionId"))
.out(
webSocketBody[Option[String], CodecFormat.TextPlain, Option[PositionUpdate], CodecFormat.Json](Fs2Streams[F])
.decodeCloseResponses(true)
)
.errorOut(statusCode and jsonBody[ErrorResponse])
Persistence, recovery & availability
Given the scope of this project, the existing design of storing the state of the queue in memory in a single instance prioritises performance and accuracy. However, in a production environment for a music festival ticket sale, persistence, recovery and availability would need to be addressed. If the server crashes, the queue will be lost and cannot be recovered on restart. And there's no backup service to serve client requests, creating a single point of failure. But if there are multiple instances for availabiliy and higher traffic, order within a single queue cannot be maintained.
Requirements | Solutions | |
---|---|---|
Persistence & Recovery | When the service restarts, it can reconstruct the previous state of the queue. | Persist the state externally, utilising a relational database like PostgresSQL, or a distributed KV store like Redis, or recover in an event-sourced fashion from a message broker like Kafka. Since this service prioritises speed and simple ops in a lightweight queue, Kafka or Redis(list) with AOF persistence enabled might be most simple for rebuilding internal state on restart. |
Availability | Multiple instances where all instances maintain the same worldview of a shared queue state at all time. All instances can perform read and write to the state without creating race conditions. | Kafka pub/sub can allow the individual service instances to each arrive at a shared state because each consumer group will receive the same messages in the same order. However, both the user and worker operations need to read from the state in order to write. It's complex to coordinate consuming and publishing as a single operation in kafka and might create race conditions and possibly require a mechanism to ensure there is only a single writer to any queue at any time. Otherwise, multiple users can end up with the same assigned position, or the same user can be served twice. Redis(list) executes all commands on a single thread maintaining atomicity and disgards the popped elements keeping the queue lightweight. But it does not automatically propagate updates made by one instance to the other instances. |
The exact design depends on the particular business requirements of the production environment for trade-offs to be made. For example, accidentally overselling or underselling tickets to a music festival may not be the end of the world.