Load Balancer Internals Pt 2 - Handling HTTP Requests
When the load balancer receives a HTTP request, the following events occur:
It applies a load balancing algorithm, like round-robin, to choose a server to which the request must be forwarded.
Once a server has been chosen, the load balancer must send the request to that server through a TCP connection.
In this part of the Load Balancer Internals series, we will focus on step 2. This step is extremely crucial, as it determines how quickly a request can be serviced, as well as how efficiently connections to a server are managed. To reiterate, the problem at hand is:
"Once the destination server for the request has been chosen, How does the load balancer map the request to a server connection?"
These are some of the approaches I considered:
Create a New Connection for each Incoming Request
This is the most naïve approach to the problem. The load balancer would perform the following steps after it chooses a destination server:
Create a new connection to the server.
Send the request on this connection.
Receive a response from the server, then close the connection.
The advantages of using this approach would be:
- Since each request is allocated its own connection, we don’t have to worry about multiple requests contending for the same connection. This makes the approach extremely easy to implement, as we don’t have to use any sort of synchronization.
The disadvantages of using this approach would be:
- Setting up and tearing down a new TCP connection for every request is not efficient.
But what if we could reuse old connections?
Connection Pooling
This approach involves maintaining a pool of persistent connections for each server. The pool represents a set of connections, any of which can be used to forward the request. If we use this method, the load balancer would take these steps after picking a destination server:
Pick a connection from the pool associated with the server.
Use the connection to send the request across this connection.
Receive a response from the server.
Put the connection back into the pool.
A few things we need to consider when using this approach:
A connection cannot be used to send two requests simultaneously:
Since we are adhering to the HTTP 1.1 protocol, a connection cannot be used to send a second request unless the server has sent back a response to the first one.
This has the following implications:
In a multi-threaded environment where multiple threads (each handling a request) are looking to pick a connection out of the pool, only one thread must be successful in acquiring it.
We must implement synchronization between these threads, so that one connection can be acquired by only one thread at a time. Once the thread acquires the connection, it would then send the request across this connection, get the response back from the server, and then place the connection back into the pool, at which point the connection can be picked up by another thread.
We must be able to retrieve a connection from the pool as quickly as possible.
If a connection in the pool is never picked up, then this connection must be terminated.
If the pool remains empty for too long, new connections must be added to the pool.
The total number of connections in the pool must remain within a user specified limit.
How do we design a mechanism to achieve all this?
Behold, the Worker Pool pattern.
Worker Pool Pattern
Theory:
Before we dive into the application of a worker pool for our case, let us understand what it is with a real world example.
Let us assume there is an office with one manager and multiple workers. The office operates as follows:
Whenever the manager realizes that a task must be done, he delegates it to one of the workers.
A worker can only do one task at a time.
If the manager cannot find a free worker within a fixed period, he must call another worker to the office.
If a worker hasn’t been delegated a task within a fixed period, he can leave the office.
The manager’s goal is efficiency—assigning tasks quickly while keeping the workforce minimal.
There are 2 ways for the manager to determine which worker is free:
First Approach : Iterate through each worker sequentially, and check whether they are idle or busy.
Second Approach : Have workers report back to the manager after they’re done with their previous task, so they can be assigned another one.
The second approach is better for a multitude of reasons, and is the essence of the worker pool pattern.
Now let’s see how this analogy applies to our HTTP request handling.
Application:
Let’s draw a parallel to the office example, to see how it fits our use case.
Task : A HTTP request, is analogous to a task.
Worker : A worker in the office is analogous to a thread that manages a server connection.
Worker completing a task : This equates to a thread forwarding the HTTP request to the server and receiving a response.
We now know what a worker is in our use case, but what about the manager? and how does the worker “report back” to the manager when it has completed its task?
Manager :
In our case, the Go scheduler acts as the manager. It delegates tasks (HTTP requests) to workers (threads) using channels.
Each thread will listen to a job channel dedicated to the server it maintains a connection with, waiting for the Go scheduler to assign it a task.
Hence the worker is “free”, when its listening to the channel.
If the Go scheduler assigns a request to the thread, it starts executing its task, and no longer listens to the channel.
Once the task is complete, the worker goes back to listening to the channel.
Implementation
Lets look at the Go implementation of a “task” and a “Worker”:
(please keep in mind that I’ve used the term “Job” in my code, instead of “Task”. they are the same thing.)
type Job struct {
ResponseWriter http.ResponseWriter /* Used by the worker to send the server’s response
back to the client. */
Request *http.Request // The request to be forwarded to the server.
Done chan struct{} // This channel is used to signal job completion.
}
type HTTPWorker struct {
Addr string // destination address of the server connection
WorkerId int // worker id to uniquely identify the worker in the pool
Timeout int /* if a worker isnt delegated a task for "timeout" seconds,
it exits. */
JobChannel <-chan Job // the channel on which the worker listens to be delegated a task.
WorkerCount *int // Total number of workers in the pool.
WorkerCountMutex *sync.Mutex // mutex to access the WorkerCount variable.
MinWorkerCount int /* used to make sure that the worker only exits
if WorkerCount >= MinWorkerCount */
HTTPClient http.Client /* client that sends request across the server connection.
I have used a HTTP client, which does its own connection
pooling, but I've limited the size of the HTTPClient's pool to 1.*/
logger *slog.Logger // logger for logging events.
}
Now that we know what a worker and task look like in the context of our load balancer, We can look at how a “worker completes a task”.
func (hw *HTTPWorker) ProcessHTTPRequest() {
for {
select {
// 1. Receive a Job on the job channel.
case job := <-hw.JobChannel:
var(
err error
requestCopy *http.Request
response *http.Response
responseBody []byte
)
hw.logger.Info("worker received task", "server_id", hw.ServerId, "worker_id", hw.WorkerId)
// 2. Make a copy of the request.
if requestCopy, err = util.CopyRequest(job.Request, hw.Addr); err != nil {
hw.logger.Error(err.Error(), "server_id", hw.ServerId, "worker_id", hw.WorkerId)
util.WriteJSON(job.ResponseWriter, 500, map[string]string{"error": "internal server error"})
continue
}
// 3. Send the request copy to the server.
if response, err = hw.HTTPClient.Do(requestCopy); err != nil {
hw.logger.Error(err.Error(), "server_id", hw.ServerId, "worker_id", hw.WorkerId)
util.WriteJSON(job.ResponseWriter, 500, map[string]string{"error": "internal server error"})
if err = response.Body.Close(); err != nil {
hw.logger.Error(err.Error(), "server_id", hw.ServerId, "worker_id", hw.WorkerId)
}
continue
} else if response == nil {
hw.logger.Error("received nil response", "server_id", hw.ServerId, "worker_id", hw.WorkerId)
util.WriteJSON(job.ResponseWriter, 500, map[string]string{"error": "internal server error"})
continue
}
// 4. Read response body.
if responseBody, err = io.ReadAll(response.Body); err != nil {
hw.logger.Error(err.Error(), "server_id", hw.ServerId, "worker_id", hw.WorkerId)
util.WriteJSON(job.ResponseWriter, 500, map[string]string{"error": "internal server error"})
if err = response.Body.Close(); err != nil {
hw.logger.Error(err.Error(), "server_id", hw.ServerId, "worker_id", hw.WorkerId)
}
continue
}
if err = response.Body.Close(); err != nil {
hw.logger.Error(err.Error(), "server_id", hw.ServerId, "worker_id", hw.WorkerId)
}
// 5. forward response to client.
util.WriteResponse(job.ResponseWriter, response.StatusCode, responseBody)
// 6. Mark the job as complete, by sending a signal on the Done channel.
close(job.Done)
}
}
}
Now we address the idea of a worker “leaving the office” if he isn’t delegated a task after “Timeout” seconds. This is handled in the same function, but as a different case in the select statement.
func (hw *HTTPWorker) ProcessHTTPRequest() {
for {
select {
case job := <-hw.JobChannel:
/*..........*/
// 1. receive a signal on this channel if the worker thread has been idle for "Timeout" seconds.
case <-time.After(time.Duration(hw.Timeout) * time.Second):
hw.WorkerCountMutex.Lock()
// 2. Check the global WorkerCount.
if *hw.WorkerCount >= hw.MinWorkerCount {
// 3. decrease WorkerCount.
*hw.WorkerCount--
// 4. Log the event.
hw.logger.Info(
fmt.Sprintf(" Worker has been idle for %d seconds, exiting.....", hw.Timeout),
"server_id", hw.ServerId,
"worker_id", hw.WorkerId,
)
// 5. Release connection resources.
hw.HTTPClient.CloseIdleConnections()
}
hw.WorkerCountMutex.Unlock()
}
}
}
Lets address how new workers are called to the office if the manager cannot find a free worker to delegate the task to. We introduce the HTTPHandler’s ServeHTTP function. We saw the function of the HTTPHandler in the previous part. Its ServeHTTP function is called every time a HTTP request is sent to the load balancer.
func (handler *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// apply load balancing algorithm to choose a server
httpServer := handler.ApplyLoadBalancingAlgorithm()
// get the job channel dedicated for that server
serverJobChannel := httpServer.JobChannel
/*
done channel is used to wait for worker to:
1) send request to server,
2) write response to ResponseWriter.
ServeHTTP function needs to wait, as ResponseWriter can only be accessed
by Worker go routine without giving superflous error while ServeHTTP func is still in execution.
*/
doneChannel := make(chan struct{})
/*
the following for loop is used to spawn more workers after a timeout.
If the maximum number of workers are already spawned, then retriesLeft deducted.
if number of retries reaches 0, then request is not serviced.
*/
maxRetries := 5.0
retriesLeft := 5.0
for {
select {
/*
1. this case statement tries to push a job to the channel.
the job channel acts like a buffered queue, which means it can only hold
a certain fixed number of unassigned jobs.
*/
case serverJobChannel <- server.Job{ResponseWriter: w, Request: r, Done: doneChannel}:
/*
if the job is pushed to the channel successfully, we wait to receive confirmation
from the worker about completion of the job by waiting to receive a signal on the
done channel.
*/
<-doneChannel
return
/*
2. time.After is a channel that sends a signal after a specified duration,
analogous to a countdown timer that rings after some time.
if the previous case statement gets executed, then this countdown timer
gets reset because of the for loop.
if the countdown timer goes off, then it means that the current request could not be
pushed into the job channel because it is full,
indicating that the number of workers are currently inadequate.
so the body of this case statement attempts to spawn more workers, analogous to a
manager calling more workers to the office.
*/
case <-time.After(time.Duration(100*math.Pow(2, maxRetries-retriesLeft)) * time.Millisecond):
httpServer.Logger.Info("trying to spawn worker go-routines", "http_server_id", httpServer.ServerId)
httpServer.WorkerCountMutex.Lock()
/*
if the number of existing worker threads is already at its limit, then
no more workers can be spawned.
the number of retries are decremented, and we try again.
if the number of retries = 0, then we return a response to the client indicating
that the load is too much.
*/
if *httpServer.WorkerCount == httpServer.MaxWorkerCount {
retriesLeft--
httpServer.Logger.Warn("maximum worker count reached", "http_server_id", httpServer.ServerId)
if retriesLeft == 0 {
util.WriteJSON(w, 429, map[string]string{"error": "Too Many Requests."})
httpServer.WorkerCountMutex.Unlock()
return
}
httpServer.WorkerCountMutex.Unlock()
} else {
/*
if the number of workers is lower than the max limit,
we spawn a new worker.
*/
*httpServer.WorkerCount++
workerId := *httpServer.WorkerCount
httpServer.WorkerCountMutex.Unlock()
newWorker := httpServer.SpawnHTTPWorker(workerId, httpServer.MinWorkerCount, httpServer.WorkerTimeout, httpServer.Logger, httpServer.WorkerCount, httpServer.WorkerCountMutex)
go newWorker.ProcessHTTPRequest()
}
}
}
}