Go concurrency the right way (e.g. my way)

Racing to the end

My take on go concurrency is that there are 3 things that matter, and we can create a robust pattern using … 3 technologies to help us out. None of this is really new, but I haven’t seen the material presented from this particular viewpoint. With that, and the additional caveat that this isn’t a beginner’s guide, let’s jump straight in

What matters

  1. Startup
  2. Shutdown
  3. Throughput

Once everything is _working_ it’s not all that complicated (*) to do concurrency in golang (or any other programming language. Data comes in, it gets farmed out to wherever it needs to go, and you have some synchronization method to prevent corruption.

(*) Yeah that’s a lie 😉

The real challenge is bootstrapping such a system in the first place, and then being able to tear it down correctly. Doing this correctly means 1) You’re not leaking resources, 2) you have your signaling right and 3) you have the building blocks to scale up/scale down as need be. I guess everything is in threes this blog post.

I’ve ignored throughput for the moment but we’ll come back to it.

Startup

Creating concurrent workers in golang is about managing resources, and creating a rendezvous point to send data & receive results later. The resources we need to deal with are threads and memory (mostly go channels). My first suggestion is to use the context struct as the bookkeeping device for handling startup. Context is great because it lets you group stuff together. It’s important later for shutdown in that you can selectively shutdown the whole group, or a nested sub-group of your program.

The first thing we need to manage is threads that we create. Threads need to end sometime, and also we want to know when they end. Extending on a common go pattern, let’s use a wait group to track the # of open threads. But for a small twist, we’re going to throw this into the context so it’s easy to interact with anywhere in our program:

package root_context

import (
  "context"
  "fmt"
  "os"
  "os/signal"
  "sync"
  "syscall"
  "time"
)

type state struct {
  wg     *sync.WaitGroup
  cancel context.CancelFunc
}
type key int

func New(parent context.Context) context.Context {
  ctx, cancel := context.WithCancel(parent)
  ctx = context.WithValue(ctx, key(0), &state{wg: &sync.WaitGroup{}, cancel: cancel})
  interruptChannel := make(chan os.Signal, 1)
  signal.Notify(interruptChannel, syscall.SIGINT)
  signal.Notify(interruptChannel, syscall.SIGTERM)
  go func() {
    select {
    case sig := <-interruptChannel:
      cancel()
      go func() {
        time.Sleep(time.Second * 30)
        panic(fmt.Errorf("received an interrupt (%s) message but the binary did not gracefully exit", sig.String()))
      }()
    case <-ctx.Done():
    }
  }()
  return ctx
}

func Add(ctx context.Context, delta int) {
  state := get(ctx)
  state.wg.Add(delta)
}

func Done(ctx context.Context) {
  state := get(ctx)
  state.wg.Done()
}

func Cancel(ctx context.Context) {
  state := get(ctx)
  state.cancel()
}

func Wait(ctx context.Context) {
  state := get(ctx)
  state.wg.Wait()
}

func get(ctx context.Context) *state {
  value := ctx.Value(key(0))
  return value.(*state)
}

Some of this matters more when we shutdown, but the main idea here is that whenever we create a new thread – go func sorry – we can keep track of it

func main() {
  ctx := root_context.New(context.Background())
  root_context.Add(ctx, 1)
  go func() {
    defer root_context.Done(ctx)
    // do stuff
  }()
}

and voila, we are now “managing” our threads for the grouping that we care about. Okay managing is a bit of a stretch. the BEAM (erlang/elixir) is so much better at this and actually manages (e.g. handles exceptions, restarts, etc.). But I digress, as we’re stuck with using golang instead and at least we can keep track of when the thread ends.

Okay, now we have threads. Now what? We need to provide a mechanism to consume data (with go channels of course).

func createWorker(ctx context.Context, incoming <-chan int) <-chan int {
  out := make(chan int)
  root_context.Add(ctx, 1)
  go func() {
    defer root_context.Done(ctx)
    defer close(out)
    for i := range incoming {
      out <- i * 2
    }
  }()
  return out
}

This is pretty straightforward go code, but some important things to consider:

This worker is a downstream worker. It takes data from in and processes it. To express this dependency, the upstream channel must be created beforehand, (solving discovery issues). The lifetime of the worker is controlled by the upstream channel. Once the incoming channel is closed and drained, the downstream worker will automatically exit. Lastly, the lifetime of the resources is fully controlled by this function. The out channel remains open until the worker shuts down, then it is closed. The go func remains open until the worker shuts down, then root_context.Done is called.

I consider it a giant red flag for channels to be closed in a different way, such that the other resources aren’t released (or bound to be released once the incoming is drained).

Throughput (and backpressure)

The worst thing (*) you can do in a go program is to have unbounded resource usage, whether it’s leaked or not.

(*) lying, again

Achieving good performance with bounded constraints is again, not novel, but also is surprisingly disregarded in a lot of go libraries I’ve seen. Some simple rules to live by:

Always have a hard bound on the number of threads your program will use. Usually that means having a fixed number of workers, or something like a worker pool. Go funcs may be cheap, but creating them just to create them is lazy and robs you of the opportunity to consider your data flow. Make hundreds of go funcs, but don’t create10,000 of them.

(Almost) always use unbuffered channels, and embrace the backpressure that comes with it. For a concrete example, let’s say you have a webserver that needs to send an email when an API is reached. The v1 code should look something like this:

main () {
  ctx := root_context.New(context.Background())
  emails := make(chan string)
  for i := 0; i < 5; i++ {
    emailWorker(ctx, emails)
  }
  startWebserver()
  root_context.Wait(ctx)
}

func emailWorker(ctx context.Context, emails <- chan string) {
  root_context.Add(ctx, 1)
  go func() {
    defer root_context.Done(ctx)
    for email := range emails {
      send_email_to_recipient(ctx, email)
    }
  }()
}

func webHandler(writer http.ResponseWriter, request *http.Request) {
  message := getRequestBody(request)
  emails <- message
  fmt.Fprintf(writer, "sent message")
}

This approach correctly manages resources. The webHandler has backpressure. If emails take a long time to send, then once 5 emails are being processed, the 6th one will cause the webHandler to wait until a slot opens up.

However it’s slow. You can’t send a HTTP request back until it’s complete and your entire HTTP handling becomes rate limited by the slowest piece. This last sentence is a _feature_ not a bug btw.

To improve your throughput, we’re now asking the correct question – how do we speed up sending emails? Now we can properly adjust our throughput without just doing a go func() { send_email_to_recipient}() and then wondering why our thread (& socket) count is growing unbounded.

There isn’t a magic pattern here since it’s actually a hard question to solve: What do you want to do when your program can’t keep up with the incoming? The answer is a spectrum of tradeoffs such as:

  • Make the caller deal with it (backpressure)
  • Drop requests
  • Make the slow part faster
  • Scale the slow part horizontally
  • Cache the request and perform the slow part asynchronously.

The key takeaway is that by designing the channels properly, we are able to properly witness where the slowdown happens, and can take corrective action. Unbounded incoming data just causes you to hit the fan with resource exhaustion but not know why or be able to take action in the first place.

One other opinion: buffered channels should only be used AFTER profiling and determining the lock contention is your problem.

The pattern I have used most often in my codebases is the “drop requests after timeout” but it really is domain specific

select {
  case emails <- message:
  case <- time.After(time.Second * 30):
    fmt.Fprintf(writer, "timeout")
  case <- ctx.Done():
    fmt.Fprintf(writer, "context canceled")
}

Shutdown

Crashes, data corruption and more are all common shutdown problems. Especially with distributed systems, my first question is: “why does it matter?” There are so many failure points anyways that your data logic should be transactional – it either all happens or none of it happens. In that case, why not just os.Exit(1) and be done with it? I was taken aback when I worked on Microsoft Windows and learned that’s exactly how explorer.exe did things (used to anyways). But the point still stands, if your data is idempotent or transactional or otherwise robust to failures, then maybe shutdown doesn’t matter at all.

Although in this case, “shutdown” can also mean “scale down resources” in which case we would like to do so without crashing. So I guess there’s a reason to keep writing this section 😉

We’ve already set ourselves up for success with root_context, but let’s talk about exactly how I think shutdown should work.

  1. Indicate that we want to shut down by closing the context. root_context.Cancel(ctx) Done.
  2. Top-level workers should listen for this and close() out any of their channels and otherwise cleanup
  3. Downstream workers should be in a for thing := range channel {} loop and close out once there’s no more work
  4. The main function can wait for all cleanup to be done with root_context.Wait(ctx) since if there are no threads… there’s no work to be done.

The idea is simple, but there’s lots of ways to mess up.

The easiest way is to rely on the context (which has now been canceled) and poisoned your ability to make further progress. For example:

func myWorker(ctx context.Context, incoming <-chan int {
  root_context.Add(ctx, 1)
  go func() {
    defer root_context.Done(ctx)
    for i := range incoming {
      req, _ := http.NewRequestWithContext(ctx, http.MethodPost, "http://example.com", nil)
      err := http.Do(req)
    }
  }()
}

Now, if we follow the shutdown pattern, this isn’t going to work. http.Do() checks the context and errors out if it’s been canceled. Maybe that’s a good thing, depending on your workflow. But otherwise you need two contexts. There’s lots of ways to do so. There’s always context.Background(), there’s “cloning” a context however that makes sense to you (this is what we do), you can have a shutdown context. You can leave the main context completely alone, and use a secondary context or other mechanism to signal initial shutdown. Up to you.

Another way to mess up is to forget to process all the work before shutting down. For example:

func myWorker(ctx context.Context, incoming <-chan int) {
  root_context.Add(ctx, 1)
  go func() {
    defer root_context.Done(ctx)
    for {
      select {
        case i := <- incoming:
          // do work
        case <-ctx.Done():
          // The context is done, time to shutdown
          // P.S. this is wrong don't do this.
          return
    }
  }()
}

The problem here is that myWorker is responsible for draining incoming. Otherwise that channel is just going to sit there (until the program dies). This is also a data integrity issue where you’re potentially not processing items during shutdown.

That said, detecting you’re in the middle of shutdown and gracefully dropping work can be a good thing. Just be sure to do it intentionally.

Wrapping things up

Toy examples don’t really convey the full picture, so I’ll follow up with a more concrete example of working with rabbitmq later. But here are some of the ideas listed in the post:

  • Use wait groups to track thread creation, so you can wait for shutdown to complete
  • Thread creation should be bounded and dependent on your computer characteristics, not the incoming data
  • Data pipelines should have backpressure as high up the pipe as possible.
  • Indicate the desire of shutdown by some signal (my suggestion is to cancel the context)
  • Stop sending new data to the top of the pipe once shutting down
  • Close up the workers by falling through when the incoming channel closes
  • Finish shutdown by waiting for all the threads to quit