Exploring a Distributed WebSocket Gateway

Introduction

As an instant messaging system, LineChat naturally relies on WebSocket to ensure real-time message delivery. For an architecture with a single backend instance, development is fairly straightforward, but that would ultimately be just a toy project with little challenge. To serve a large number of concurrent users, we must design a distributed architecture that supports horizontal scaling, capable of handling user connections, message delivery, and automatic failover. This post covers my exploration, thoughts, and implementation of such an architecture.

New Problems

If the backend is deployed as a single instance, when the user numbers surge, the simplest approach is vertical scaling, for example, upgrading from a VM with 16 cores and 32 GB of RAM to one with 512 cores and 2 TB of RAM. Leaving aside hardware costs and whether such a product exists, the biggest bottleneck at that point is the software itself. Unless the architecture is deliberately designed from the start with HPC-like considerations (i.e., accounting for low-level details and optimizations), typical architectures will inevitably suffer from significant resource contention and fail to fully utilize the hardware. And perhaps most critically, what happens when a failure occurs? Do we simply expect the millions of users connected to that VM to wait and reconnect later? But we cannot guarantee that a replacement service can be brought online within an extremely short time frame.

Therefore, such a communication system must be deployed across multiple instances. This introduces the following challenges:

  • User connection: Which instance should a user connect to?
  • Message routing: How does one instance know which other instance is responsible for a given user? How are messages passed between instances?
  • High availability: Any component in a distributed system can go offline. How should we gracefully handle these scenarios?
  • Elastic scaling: How do we adapt to fluctuations in online user count and message traffic?

User Connection

For mapping users to a specific instance, a simple solution is to use a reverse proxy like Nginx. In this setup, a client's WebSocket upgrade request is forwarded directly to the corresponding backend instance, and the two eventually establish a connection through the proxy.

This approach has two problems:

First, Single point of failure. If the Nginx instance fails, all users must reconnect.

Second, Resource consumption. For HTTP requests, Nginx reuses TCP connections via a connection pool, keeping the number of established TCP connections relatively low compared to the number of requests handled. However, WebSocket connections are long-lived. To proxy a single WebSocket connection, Nginx must maintain two TCP connections. The ever-increasing number of file descriptors and memory buffers will eventually lead to "too many open files" errors or OOM, imposing a hard limit on the number of users a single Nginx instance can support. It might handle a million, but what about a hundred million?

Thus, clients should establish a direct connection with a backend instance. My approach is as follows: the client first calls an HTTP endpoint (GET /api/connection-url) to retrieve the address of the backend instance responsible for its WebSocket connection, and then initiates the WebSocket handshake using that address.

For production environments, we may place an Nginx instance in front of each backend instance for security hardening and protection.

The implementation of this HTTP endpoint is discussed in the next section.

Message Routing

This section addresses two issues: (1) the mapping between users and instances as well as how to store it, and (2) how to pass messages between instances.

Reflecting on the previous section, if we ignore the single-point bottleneck introduced by using a single Nginx reverse proxy, what load‑balancing strategy should we adopt? Round‑robin, least connections, or consistent hashing? The answer depends on how we design the user-to-instance mapping.

One approach is to store in Redis the information about which instance is responsible for each user. Assuming an instance ID takes 4 bytes, a user ID 8 bytes, and a user device ID 8 bytes, storing data for 100 million users would consume $10^{8}\times(4\text{B} + 8\text{B} + 8\text{B}) = 2\text{GB}$ of memory, which is perfectly acceptable. If the system already tracks user online presence, it likely uses Redis for that purpose as well. In this case, we are simply adding 4 bytes for the instance ID to each user's online device object.

From a business perspective, using this Redis-based mapping means Nginx's load‑balancing strategy can be arbitrary, since any instance can query Redis to find the mapping for any user.

How do we handle the second issue? Communication in distributed systems is typically based on either RPC or an event bus (the latter often implemented using a message queue). For transmitting large volumes of message data where both reliability and real-time performance are critical, an asynchronous event bus is the clear choice. We can select a suitable message queue such as RabbitMQ or Kafka. Note that Redis Pub/Sub is not a good fit here because it is essentially a synchronous communication mechanism and messages can be lost.

With that, both message routing issues are solved. The flow for sending and receiving a message is as follows:

  • User U1 (client instance C1) connects to instance B1. B1 updates Redis to indicate that U1 is assigned to B1.
  • User U2 (client instance C2) connects to instance B2. B2 updates Redis to indicate that U2 is assigned to B2.
  • U1 sends a message to U2: C1 sends the message via WebSocket to B1.
  • B1 queries Redis, discovers that U2 is handled by B2, and forwards the message to B2's message queue.
  • B2 receives the message and forwards it to C2 over the WebSocket connection.
  • U2 sees the new message.

High Availability

The inherent difficulty of distributed systems lies in the fact that any component can fail at any time, rendering parts of the system unavailable. Therefore, our architecture must automatically handle such failures. While the solution described in the previous section works under normal conditions, it becomes difficult to ensure correctness when things go wrong.

Consider Redis first. Even ignoring sharded clusters, suppose a single Redis instance suffices for all data. To achieve data redundancy and availability, we typically employ master‑slave replication with Sentinel for failover. However, Redis replication is asynchronous. During the interval between a master failure and the completion of a failover, data loss is highly probable.

Now consider a backend instance going offline. Continuing the previous communication example, suppose instance B2 goes down, causing C2 to disconnect. U1 then sends another message to U2. B1 queries Redis, sees that U2 is assigned to B2, and forwards the message to B2's queue. Meanwhile, C2 reconnects to a new instance B3, but B3 has not yet had a chance to update Redis with the new mapping for U2. As a result, U2 will never receive the message.

Of course, there are several ways to address this. One is to treat the message as an offline message and trigger an offline message synchronization mechanism. Another is to implement the GET /api/connection-url endpoint using consistent hashing based on the user ID, and then use Kubernetes StatefulSet. This ensures that a given user always connects to the same address, and that the instance bound to that address always consumes from the same queue.

Note that with the second approach, we can eliminate the need to store user-to-instance mappings, because the mapping can be computed locally, regardless of whether the user is online. By removing the Redis dependency, we sidestep anomalies caused by Redis failures. Moreover, Kubernetes ensures that our service is highly available.

Elastic Scaling

The previous section introduced Kubernetes to guarantee availability and business correctness. However, this is not a cloud‑native architecture because it does not support elastic scaling to handle variable traffic. The core issue is that each instance is tied to a specific queue. When message volume is low, instances sit idle, wasting compute resources. When volume spikes, instances become overwhelmed, leading to message backlogs and degraded real‑time performance. The former wastes money while the latter hurts user experience, both of which are significant problems.

How do we resolve message backlogs? By having more instances consume from the same queue. But currently a client connect to only one instance, how can we ensure that no messages are lost? This essentially introduces yet another layer of load balancing on the consumer side of the message queue. To avoid this complex problem, we might just require a client to connect to all instances consuming that queue. However, this would inevitably force a redesign of the user-to-instance mapping.

Suppose Kubernetes HPA detects a backlog in a queue and spins up a new instance to consume it. This new instance would then need to update the user-to-instance mappings for all relevant users and establish connections with them. This approach might handle a sudden traffic surge, but what if we add dozens of instances and the backlog persists? Clients already maintain multiple WebSocket connections, and there is a practical limit to how many they can sustain. This implies that the underlying array for the user‑ID‑based hash mapping, i.e., replicas in the StatefulSet, must be large enough so that each queue handles only a small fraction of total traffic. But when traffic is low, we would still need to keep replicas instances running. The problem of unnecessary cost still exists.

We appear to be at an impasse. However, reflecting on the discussion, we can at least draw the following conclusions:

  • Message backlog is difficult to handle; it is best if each queue handles only a small portion of the overall traffic.
  • The user-to-instance mapping is an unavoidable topic. As system requirements grow, the mapping scheme will only become more complex.

This indicates that our initial design was flawed. The User‑Instance‑Queue architecture is the root of the problem. We mistakenly placed the instance at the center, using it as an intermediary layer between users and queues. This mistake stems partly from a misunderstanding of Kubernetes—believing that merely adopting Kubernetes equates to a cloud‑native architecture and all problems will be solved.

The correct design is User‑Queue‑Instance, where the queue occupies the central role. We should have a sufficient number of queues and map user IDs to specific queues using a hash function, ensuring that each queue receives a relatively small share of the traffic. Depending on the load, we can have one instance consume multiple queues, or dedicate a single instance to a single queue, achieving true elastic scaling. By abstracting at the queue layer, horizontal scaling of backend instances becomes feasible. Once an instance binds to some queue, it inherently knows which users it is responsible for and can communicate with them.

My Implementation

The User‑Queue‑Instance architecture described in previous section is exactly what LineChat adopts. The only minor difference is that LineChat uses User‑Partition‑Instance, but partitions and queues have a one‑to‑one correspondence, so the distinction is negligible.

LineChat's backend is built with Spring Boot, and it uses Kafka as the message queue (Note that Kafka's topic partitions are a separate concept from the partitions in LineChat). The user-to-partition mapping is computed locally using hash(uid) % partitionCount, eliminating the need for external storage. The final challenge is the partition-to-instance mapping.

We could, of course, fall back to the initial plan and store partition‑to‑instance mappings in Redis. However, as discussed earlier, Redis does not offer strong consistency guarantees. When strong consistency is required, consensus algorithms come to mind. Additionally, the number of partitions is far smaller than the number of users. These two factors make using a distributed coordination service a viable alternative to Redis. I initially considered etcd, but its Java SDK has not yet reached a stable release. As a fallback, I opted for ZooKeeper, which is widely used in the Java backend ecosystem.

More importantly, from a business correctness standpoint, a distributed coordination service is a must because we need to guarantee that at most one instance is bound to a partition at any given time.

The overall workflow of the system is as follows:

  • A backend instance starts up and watches a specific path in ZooKeeper. Using the locking mechanism, it attempts to acquire a partition.
  • Upon successfully acquiring a partition, it writes its instance information to the corresponding lock file, then joins the corresponding Kafka consumer group and begins consuming from the corresponding topic.
  • For a GET /api/connection-url request, the backend first computes which partition the user belongs to, then queries ZooKeeper for the contents of the lock file associated with that partition to obtain the address of the responsible instance, and returns it in the response.
  • For a WebSocket connection request, the instance verifies whether it is responsible for the partition the user belongs to. If not, it rejects the connection.
  • For a send‑message request, the backend computes the recipient's partition and publishes the message to the corresponding Kafka topic.
  • When a new message arrives on the subscribed Kafka topic, the instance forwards it to the intended recipient via WebSocket.

A few notes:

  • Typically, a single instance will acquire multiple partitions. The exact number of partitions per instance can be controlled via the control plane (e.g., a configuration center), enabling better adaptation to traffic spikes and realizing elastic scaling.
  • To maintain a one‑to‑one correspondence between LineChat partitions and queues, for a partition numbered k, the instance joins a Kafka consumer group named consumer-group-k and subscribes to a topic named topic-k, which contains exactly one partition. By using a Kafka consumer group and a single‑partition topic, we guarantee that only one instance consumes the queue.
  • To reduce the load on ZooKeeper, the backend leverages the client‑side caching feature provided by the Curator framework. While this introduces consistency problems, additional validation logic within the business code can avoid any adverse effects.

Let us now examine the availability and automatic failover behavior of this system.

  • ZooKeeper and Kafka are themselves designed for high availability, requiring minimal concern from our side.
  • If a backend instance loses its connection to ZooKeeper, since the lock file is an ephemeral node, it is automatically removed. The backend, upon detecting the disconnection event, exits its Kafka consumer group and stops consuming messages from the associated topic.
  • If a backend instance crashes outright, the resources held in both services are automatically released, preventing deadlocks.

Because the system can now tolerate extended downtime of individual components, it naturally handles the short downtime that occur during elastic scaling. Specifically, when message traffic surges and we need to scale out horizontally, existing instances release some partitions while new instances acquire them. This may cause a small number of users to briefly disconnect, but they will quickly reconnect and resume normal operation. The scale‑in process follows a similar pattern and is not detailed further here.

Thus, LineChat successfully addresses the key challenges of a distributed WebSocket architecture: user connection, message routing, high availability, and elastic scaling.

References