Background Tasks in Go

development golang workers

When you design a software system or REST API, there are times when you would want to process large amounts of data or do time consuming computations that cannot be completed within a decent amount of time. You cannot expect your users to make an HTTP call that takes several seconds or minutes to respond while the backend server handles the request by doing some time consuming task. Often the load balancer or the HTTP server will enforce a timeout for a request after which the connection is terminated and results in a gateway timeout error. This is bad user experience which can be avoided by using background tasks. In this design pattern, the HTTP request initiated by the client will queue a task for processing in the backend server and respond with a unique identifier. The client can then use this identifier to check the status of the task periodically. When the server responds with a terminal state, either success or failure, the client can stop polling for the status. In this post, we will look at how one can implement a background task manager in an HTTP server.

Background Task Manager

First, let’s look at the requirements for our background task manager. The task manager will have limited resources, so it can accept only certain number of tasks beyond which it will stop queueing new tasks and return an error. In such cases, it is up to the client to retry after a while. Now, let’s see how the worker type and worker interface will look like:

type worker struct {
	workchan    chan workType
	workerCount int
	buffer      int
	wg          *sync.WaitGroup
	cancelFunc  context.CancelFunc
}

type WorkerIface interface {
	Start(pctx context.Context)
	Stop()
	QueueTask(task string, workDuration time.Duration) error
}

func New(workerCount, buffer int) WorkerIface {
	w := worker{
		workchan:    make(chan workType, buffer),
		workerCount: workerCount,
		buffer:      buffer,
		wg:          new(sync.WaitGroup),
	}

	return &w
}

The worker interface has three methods:

  • Start method will start the background workers
  • Stop method will cancel the work that the workers are doing and stop them gracefully
  • QueueTask method will queue a new task to be worked on by the background workers

In order to stop the active tasks of the workers, we will use context.CancelFunc. We will use workchan channel to queue the task in. The free workers will pick up any task that is in this channel.

Let’s look at the Start and Stop methods of the WorkerIface. The Start method will initialise a cancellable context which will aid in cancelling any tasks that the workers actively working on. Start method will also spawn w.workerCount number of worker goroutines. These goroutines will actually perform the work that is queued in the w.workchan channel.

func (w *worker) Start(pctx context.Context) {
	ctx, cancelFunc := context.WithCancel(pctx)
	w.cancelFunc = cancelFunc

	for i := 0; i < w.workerCount; i++ {
		w.wg.Add(1)
		go w.spawnWorkers(ctx)
	}
}

func (w *worker) Stop() {
	log.Info("stop workers")
	close(w.workchan)
	w.cancelFunc()
	w.wg.Wait()
	log.Info("all workers exited!")
}

The w.spawnWorkers method will look over the w.workchan channel to pick the tasks that are queued. It will also check the context to see if the cancel function is triggered, which basically indicates that the workers need to be stopped. The w.spawnWorkers method will call the w.doWork method which will have the logic to do the necessary work expected by the client. This method can be modified according to the kind of work that you want to do. You can also add some code here to update a central data store indicating that the work has been completed with the appropriate status. As an example here, we use sleepContext function which basically indicates some unit of work.

func (w *worker) spawnWorkers(ctx context.Context) {
	defer w.wg.Done()

	// loop over workchan to pick up queued tasks.
	for work := range w.workchan {
		select {
		case <-ctx.Done():
			return
		default:
			w.doWork(ctx, work.TaskID, work.WorkDuration)
		}
	}
}

func (w *worker) doWork(ctx context.Context, task string, workDuration time.Duration) {
	log.WithField("task", task).Info("do some work now...")
	sleepContext(ctx, workDuration)
	// update task management data store indicating that the work is complete!
	log.WithField("task", task).Info("work completed!")
}

func sleepContext(ctx context.Context, sleep time.Duration) {
	select {
	case <-ctx.Done():
	case <-time.After(sleep):
	}
}

The QueueTask method from the WorkerIface interface will check if the w.workchan channel is full or not. If it is full, QueueTask method will return an error indicating the same. If the channel is not full, it will queue the task in the channel.

func (w *worker) QueueTask(task string, workDuration time.Duration) error {
	if len(w.workchan) >= w.buffer {
		return ErrWorkerBusy
	}

	// queue task in the workchan.
	w.workchan <- workType{TaskID: task, WorkDuration: workDuration}

	return nil
}

type workType struct {
	TaskID       string
	WorkDuration time.Duration
}

var (
	ErrWorkerBusy = errors.New("workers are busy, try again later")
)

With this, our basic background task manager is ready to be used in the HTTP server.



HTTP Server

In a simple HTTP server, we will use the background task manager that we implemented in the previous section to queue tasks and execute some work. We will need to initialise the worker interface and then start the workers. We will also implement graceful shutdown of HTTP server and the background workers.

package main

import (
	"context"
	"encoding/json"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/abvarun226/background-workers/workers"
	"github.com/apex/log"
	"github.com/gorilla/mux"
)

func main() {
	ctx := context.Background()
	graceperiod := 5 * time.Second
	workerCount := 10
	buffer := 100
	httpAddr := ":8000"

	log.Info("starting workers")
	w := workers.New(workerCount, buffer)
	w.Start(ctx)

	h := handler{worker: w}

	router := mux.NewRouter()
	router.HandleFunc("/queue-task", h.queueTask).Methods("POST")

	srv := &http.Server{
		Addr:    httpAddr,
		Handler: router,
	}

	done := make(chan os.Signal, 1)
	signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

	log.Info("starting http server")
	go func() {
		if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			log.WithError(err).Fatalf("listen failed")
		}
	}()

	<-done
	log.Info("http server stopped")

	ctxTimeout, cancel := context.WithTimeout(ctx, graceperiod)
	defer func() {
		w.Stop()
		cancel()
	}()

	if err := srv.Shutdown(ctxTimeout); err != nil {
		log.WithError(err).Fatalf("http server shutdown failed")
	}
}

The queueTask HTTP handler is where the request is received to queue the task in the background task manager.

type handler struct {
	worker workers.WorkerIface
}

func (h *handler) queueTask(w http.ResponseWriter, r *http.Request) {
	var input queueTaskInput
	if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
		log.WithError(err).Info("failed to read POST body")
		renderResponse(w, http.StatusBadRequest, `{"error": "failed to read POST body"}`)
		return
	}
	defer r.Body.Close()

	// parse the work duration from the request body.
	workDuration, errParse := time.ParseDuration(input.WorkDuration)
	if errParse != nil {
		log.WithError(errParse).Info("failed to parse work duration in request")
		renderResponse(w, http.StatusBadRequest, `{"error": "failed to parse work duration in request"}`)
		return
	}

	// queue the task in background task manager.
	if err := h.worker.QueueTask(input.TaskID, workDuration); err != nil {
		log.WithError(err).Info("failed to queue task")
		if err == workers.ErrWorkerBusy {
			w.Header().Set("Retry-After", "60")
			renderResponse(w, http.StatusServiceUnavailable, `{"error": "workers are busy, try again later"}`)
			return
		}
		renderResponse(w, http.StatusInternalServerError, `{"error": "failed to queue task"}`)
		return
	}

	renderResponse(w, http.StatusAccepted, `{"status": "task queued successfully"}`)
}

func renderResponse(w http.ResponseWriter, status int, message string) {
	w.Header().Set("content-type", "application/json")
	w.WriteHeader(status)
	w.Write([]byte(message))
}

type queueTaskInput struct {
	TaskID       string `json:"task_id"`
	WorkDuration string `json:"work_duration"`
}

The output of this HTTP server will look like this:

  go build .
  ./background-workers 
2021/12/03 02:01:11  info starting workers         
2021/12/03 02:01:11  info starting http server     
2021/12/03 02:01:20  info do some work now...       task=hello world
2021/12/03 02:02:20  info work completed!           task=hello world

  http POST "http://localhost:8000/queue-task" task_id="hello world" work_duration="60s"
HTTP/1.1 202 Accepted
Content-Length: 38
Content-Type: application/json
Date: Thu, 02 Dec 2021 20:31:20 GMT

{
    "status": "task queued successfully"
}

The full source code is available in the Github repository https://github.com/abvarun226/blog-source-code/tree/master/background-tasks-in-go

I hope this was helpful and if you find any errors or issues in this post, please do let me know in the comments section below.

In case you would like to get notified about more articles like this, please subscribe to my substack.

Comments

comments powered by Disqus