Background Tasks in Go
development golang workersWhen you design a software system or REST API, there are times when you would want to process large amounts of data or do time consuming computations that cannot be completed within a decent amount of time. You cannot expect your users to make an HTTP call that takes several seconds or minutes to respond while the backend server handles the request by doing some time consuming task. Often the load balancer or the HTTP server will enforce a timeout for a request after which the connection is terminated and results in a gateway timeout error. This is bad user experience which can be avoided by using background tasks. In this design pattern, the HTTP request initiated by the client will queue a task for processing in the backend server and respond with a unique identifier. The client can then use this identifier to check the status of the task periodically. When the server responds with a terminal state, either success or failure, the client can stop polling for the status. In this post, we will look at how one can implement a background task manager in an HTTP server.
Background Task Manager
First, let’s look at the requirements for our background task manager. The task manager will have limited resources, so it can accept only certain number of tasks beyond which it will stop queueing new tasks and return an error. In such cases, it is up to the client to retry after a while. Now, let’s see how the worker type and worker interface will look like:
type worker struct {
workchan chan workType
workerCount int
buffer int
wg *sync.WaitGroup
cancelFunc context.CancelFunc
}
type WorkerIface interface {
Start(pctx context.Context)
Stop()
QueueTask(task string, workDuration time.Duration) error
}
func New(workerCount, buffer int) WorkerIface {
w := worker{
workchan: make(chan workType, buffer),
workerCount: workerCount,
buffer: buffer,
wg: new(sync.WaitGroup),
}
return &w
}
The worker interface has three methods:
Start
method will start the background workersStop
method will cancel the work that the workers are doing and stop them gracefullyQueueTask
method will queue a new task to be worked on by the background workers
In order to stop the active tasks of the workers, we will use context.CancelFunc
. We will use workchan
channel to queue the task in. The free workers will pick up any task that is in this channel.
Let’s look at the Start
and Stop
methods of the WorkerIface
. The Start
method will initialise a cancellable context which will aid in cancelling any tasks that the workers actively working on. Start
method will also spawn w.workerCount
number of worker goroutines. These goroutines will actually perform the work that is queued in the w.workchan
channel.
func (w *worker) Start(pctx context.Context) {
ctx, cancelFunc := context.WithCancel(pctx)
w.cancelFunc = cancelFunc
for i := 0; i < w.workerCount; i++ {
w.wg.Add(1)
go w.spawnWorkers(ctx)
}
}
func (w *worker) Stop() {
log.Info("stop workers")
close(w.workchan)
w.cancelFunc()
w.wg.Wait()
log.Info("all workers exited!")
}
The w.spawnWorkers
method will look over the w.workchan
channel to pick the tasks that are queued. It will also check the context to see if the cancel function is triggered, which basically indicates that the workers need to be stopped. The w.spawnWorkers
method will call the w.doWork
method which will have the logic to do the necessary work expected by the client. This method can be modified according to the kind of work that you want to do. You can also add some code here to update a central data store indicating that the work has been completed with the appropriate status. As an example here, we use sleepContext
function which basically indicates some unit of work.
func (w *worker) spawnWorkers(ctx context.Context) {
defer w.wg.Done()
// loop over workchan to pick up queued tasks.
for work := range w.workchan {
select {
case <-ctx.Done():
return
default:
w.doWork(ctx, work.TaskID, work.WorkDuration)
}
}
}
func (w *worker) doWork(ctx context.Context, task string, workDuration time.Duration) {
log.WithField("task", task).Info("do some work now...")
sleepContext(ctx, workDuration)
// update task management data store indicating that the work is complete!
log.WithField("task", task).Info("work completed!")
}
func sleepContext(ctx context.Context, sleep time.Duration) {
select {
case <-ctx.Done():
case <-time.After(sleep):
}
}
The QueueTask
method from the WorkerIface
interface will check if the w.workchan
channel is full or not. If it is full, QueueTask
method will return an error indicating the same. If the channel is not full, it will queue the task in the channel.
func (w *worker) QueueTask(task string, workDuration time.Duration) error {
if len(w.workchan) >= w.buffer {
return ErrWorkerBusy
}
// queue task in the workchan.
w.workchan <- workType{TaskID: task, WorkDuration: workDuration}
return nil
}
type workType struct {
TaskID string
WorkDuration time.Duration
}
var (
ErrWorkerBusy = errors.New("workers are busy, try again later")
)
With this, our basic background task manager is ready to be used in the HTTP server.
HTTP Server
In a simple HTTP server, we will use the background task manager that we implemented in the previous section to queue tasks and execute some work. We will need to initialise the worker interface and then start the workers. We will also implement graceful shutdown of HTTP server and the background workers.
package main
import (
"context"
"encoding/json"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/abvarun226/background-workers/workers"
"github.com/apex/log"
"github.com/gorilla/mux"
)
func main() {
ctx := context.Background()
graceperiod := 5 * time.Second
workerCount := 10
buffer := 100
httpAddr := ":8000"
log.Info("starting workers")
w := workers.New(workerCount, buffer)
w.Start(ctx)
h := handler{worker: w}
router := mux.NewRouter()
router.HandleFunc("/queue-task", h.queueTask).Methods("POST")
srv := &http.Server{
Addr: httpAddr,
Handler: router,
}
done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
log.Info("starting http server")
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.WithError(err).Fatalf("listen failed")
}
}()
<-done
log.Info("http server stopped")
ctxTimeout, cancel := context.WithTimeout(ctx, graceperiod)
defer func() {
w.Stop()
cancel()
}()
if err := srv.Shutdown(ctxTimeout); err != nil {
log.WithError(err).Fatalf("http server shutdown failed")
}
}
The queueTask
HTTP handler is where the request is received to queue the task in the background task manager.
type handler struct {
worker workers.WorkerIface
}
func (h *handler) queueTask(w http.ResponseWriter, r *http.Request) {
var input queueTaskInput
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
log.WithError(err).Info("failed to read POST body")
renderResponse(w, http.StatusBadRequest, `{"error": "failed to read POST body"}`)
return
}
defer r.Body.Close()
// parse the work duration from the request body.
workDuration, errParse := time.ParseDuration(input.WorkDuration)
if errParse != nil {
log.WithError(errParse).Info("failed to parse work duration in request")
renderResponse(w, http.StatusBadRequest, `{"error": "failed to parse work duration in request"}`)
return
}
// queue the task in background task manager.
if err := h.worker.QueueTask(input.TaskID, workDuration); err != nil {
log.WithError(err).Info("failed to queue task")
if err == workers.ErrWorkerBusy {
w.Header().Set("Retry-After", "60")
renderResponse(w, http.StatusServiceUnavailable, `{"error": "workers are busy, try again later"}`)
return
}
renderResponse(w, http.StatusInternalServerError, `{"error": "failed to queue task"}`)
return
}
renderResponse(w, http.StatusAccepted, `{"status": "task queued successfully"}`)
}
func renderResponse(w http.ResponseWriter, status int, message string) {
w.Header().Set("content-type", "application/json")
w.WriteHeader(status)
w.Write([]byte(message))
}
type queueTaskInput struct {
TaskID string `json:"task_id"`
WorkDuration string `json:"work_duration"`
}
The output of this HTTP server will look like this:
➜ go build .
➜ ./background-workers
2021/12/03 02:01:11 info starting workers
2021/12/03 02:01:11 info starting http server
2021/12/03 02:01:20 info do some work now... task=hello world
2021/12/03 02:02:20 info work completed! task=hello world
➜ http POST "http://localhost:8000/queue-task" task_id="hello world" work_duration="60s"
HTTP/1.1 202 Accepted
Content-Length: 38
Content-Type: application/json
Date: Thu, 02 Dec 2021 20:31:20 GMT
{
"status": "task queued successfully"
}
The full source code is available in the Github repository https://github.com/abvarun226/blog-source-code/tree/master/background-tasks-in-go
I hope this was helpful and if you find any errors or issues in this post, please do let me know in the comments section below.
In case you would like to get notified about more articles like this, please subscribe to my substack.
Related Posts
Delete Queues in AWS SQS using GoUsing Ristretto to cache objects in Go
Validate email address in Go
Create Queues in AWS SQS using Go
Multipart Requests in Go