I'm pretty happy using Go in my day-to-day programming. I work on data storage and data retrieval for LightstepLightstep and our data ingestion and storage systems are large enough that there are at least six services involved with more than a hundred actual instances. At various places in the pipeline we need fan-out and fan-in for serving requests to analysis tools, the web UI, and our public API.
When I'm writing this fan-out/fan-in pattern, I reach for goroutines and channelschannels by default. They provide a good foundation for building simple and performant request-handling systems.
The Fan-Out/Fan-In Pattern
Here's a simple diagram of a serving frontend, backed by one of our distributed data storage systems:
For simple queries like "retrieve this distributed trace", we can route the request to exactly one storage server. We have a simple pass-through RPC (remote procedure call):
func (s *server) MakeRequest(ctx context.Context, req MyRequestObject) (MyResponseObject, error) {
server := storageForRequest(req)
select {
case response, err := server.MakeRequest(ctx, req):
return response, err
case <-ctx.Done():
return MyResponseObject{}, ctx.Err()
}
}
For more complex queries, we need to send the same query to every storage server and group the results. Each server may send more than one result, and to begin returning results quickly, each server streams individual results back. We need to receive and process results from each storage server.
Here's the buggy code I usually write on the first pass:
func (s *server) MakeRequest(
ctx context.Context,
req MyRequestObject,
stream chan<- MyResponseObject,
) error {
servers := allStorage()
type responseMsg struct {
response MyResponseObject
err error
}
responses := make(chan responseMsg)
// Get each server's responses.
for _, server := range servers {
server := server
go func() {
for {
responseChan := server.MakeRequest(ctx, req)
select {
case response, ok := <-responseChan:
if !ok { // all responses received
return
}
responses <- response
if response.err != nil {
return
}
case <-ctx.Done():
return
}
}
}(ctx, server, responses)
}
// Consume all responses
for {
select {
case response, ok <-responses:
if !ok {
return nil
}
// you could also choose to log and
// continue
if response.err != nil {
return response.err
}
stream <- response.response
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
Do you see the bug? Most of the time this code works just fine. When responses are all returned before the context.Context
expires, everything goes as planned. When context is cancelled, you leak goroutines and channels.
Imagine the context is cancelled while a producer is here:
select {
case response, ok := <-responseChan:
if !ok { // all responses received
return
}
responses <- response // ***** WAITING TO SEND
if response.err != nil {
return
}
case <-ctx.Done():
return
}
}
and the consumer is here:
select { // **** WAITING FOR EITHER CASE
case response, ok <-responses:
if !ok {
return nil
}
// you could also choose to log and
// continue
if response.err != nil {
return response.err
}
stream <- response.response
case <-ctx.Done():
return ctx.Err()
}
When the context is cancelled, the consumer exits. All the producers who were waiting to write to responses
will wait forever. The channel, the response objects, and other memory associated with the goroutine all leak. Eventually the heap fills up and your program OOMs.
What's wrong?
The trouble with this code is it looks right! We all know you need a way to terminate your select
statements. We know you always need to know how your goroutines will terminateknow how your goroutines will terminate.
context.Context
seems like a good way to do that! The problem with this buggy code is that there's more than one way the pipeline can shutdown. In the "unhappy path", either side can exit first.
A better version of what's written here is:
func (s *server) MakeRequest(
ctx context.Context,
req MyRequestObject,
stream chan<- MyResponseObject,
) error {
servers := allStorage()
type responseMsg struct {
response MyResponseObject
err error
}
responses := make(chan responseMsg)
// Producers send to this before exiting
done := make(chan struct{}{})
// Get each server's responses.
for _, server := range servers {
server := server
go func() {
defer func() {
done <- struct{}{}
}()
responseChan := server.MakeRequest(ctx, req)
for {
select {
case response, ok := <-responseChan:
if !ok { // all responses received
return
}
responses <- response
if response.err != nil {
return
}
case <-ctx.Done():
return
}
}
}()
}
finished := 0
// Consume all responses
for finished < len(servers) {
select {
case response, ok <-responses:
if !ok {
return nil
}
// You could also choose to exit after this error.
if response.err != nil {
logError(response.err)
continue
}
stream <- response.response
case <-done:
finished++
}
// Don't select on ctx.Done() - the producers handle cancellation
}
if err := ctx.Err(); err != nil {
return err
}
return nil
}
In this code the pipeline producers are responsible for exiting on cancellation. The consumer doesn't listen for context cancellation - it relies on the producers to write to done
when they're finished. Because the servers only write to done
when they won't send any more responses, we won't miss any when selecting on the unbuffered responses
channel. It's necessary to put the responses
read first so that this is guaranteed.
There are also much simpler ways to write this correctly - though with poorer performance:
func MakeRequest(ctx context.Context, request MyRequestObject) ([]MyResponseObject, error) {
for _, s := range servers {
responseChan := s.Makerequestuest(req)
for {
select {
case response,ok:= <-responseChan:
responses = append(responses, response)
break
case ctx.Done():
return nil, ctx.Err()
}
}
}
return responses, nil
}
Make sure you actually need the parallel processing before you reach for it!
There is also another pattern that you can use to implement fan-out/fan-in pipelines without leaking: On context cancellation, the consumer can read and throw away any responses before returning. This will prevent producers from blocking on a channel send. I prefer the pattern above because it saves writing an additional loop inside the consumer's select.
If I Could Only Tell You Two Things
They would be:
In pipelines, make sure only one end controls exit. This should usually be the input end of the pipeline - the producers in this example. They can handle this by closing a channel or channels or other ways of signalling that work is done. The key here is that the pipeline stops producing new inputs and all the inputs are processed before it exits. Even if it means throwing the inputs away during processing, it's critical that the pipeline is empty.
Don't reach for concurrency unless you need it. Goroutines and channels are tricky to get right, like almost any low-level concurrency primitive. They're powerful, but you have to think through every case where blocking can occur to avoid a deadlock or a leak.
Good luck to you, go build some software!
Interested in joining our team? See our open positions herehere.
Explore more articles
How Queries at Lightstep Work
Brian Lamb | Oct 24, 2022In this post, we’ll explore how the different stages of a query interact with each other to take your raw data from our data storage layer and aggregate it into useful visualizations.
Learn moreLearn more
Let's Talk About Psychological Safety
Adriana Villela | Jul 12, 2022System outages are rough for all involved, INCLUDING those who are scrambling to get things up and running again as quickly as possible. Psychological safety is crucial, ensuring that employees are at their best & don't burn out. Read on for more on this.
Learn moreLearn more
OpenTelemetry Python Metrics approaching General Availability
Alex Boten | Apr 1, 2022OpenTelemetry Metrics is moving towards general availability across the project, including in Python. Learn about how to get started with it today!
Learn moreLearn moreLightstep sounds like a lovely idea
Monitoring and observability for the world’s most reliable systems