Lightstep from ServiceNow Logo

Products

Solutions

Documentation

Resources

Lightstep from ServiceNow Logo
< all blogs

The concurrent bug I keep writing

I'm pretty happy using Go in my day-to-day programming. I work on data storage and data retrieval for LightstepLightstep and our data ingestion and storage systems are large enough that there are at least six services involved with more than a hundred actual instances. At various places in the pipeline we need fan-out and fan-in for serving requests to analysis tools, the web UI, and our public API.

When I'm writing this fan-out/fan-in pattern, I reach for goroutines and channelschannels by default. They provide a good foundation for building simple and performant request-handling systems.

The Fan-Out/Fan-In Pattern

Here's a simple diagram of a serving frontend, backed by one of our distributed data storage systems:

fanout-engblog

For simple queries like "retrieve this distributed trace", we can route the request to exactly one storage server. We have a simple pass-through RPC (remote procedure call):

func (s *server) MakeRequest(ctx context.Context, req MyRequestObject) (MyResponseObject, error) {
	server := storageForRequest(req)

	select {
	case response, err := server.MakeRequest(ctx, req):
		return response, err
	case <-ctx.Done():
		return MyResponseObject{}, ctx.Err()	
	}
}

For more complex queries, we need to send the same query to every storage server and group the results. Each server may send more than one result, and to begin returning results quickly, each server streams individual results back. We need to receive and process results from each storage server.

Here's the buggy code I usually write on the first pass:

func (s *server) MakeRequest(
	ctx context.Context,
	req MyRequestObject,
	stream chan<- MyResponseObject,
) error {
	servers := allStorage()

	type responseMsg struct {
		response MyResponseObject
		err error
	}
	responses := make(chan responseMsg)

	// Get each server's responses.
	for _, server := range servers {
		server := server
		go func() {
			for {
				responseChan := server.MakeRequest(ctx, req)
				select {
				case response, ok := <-responseChan:
					if !ok {  // all responses received
						return
					}

					responses <- response
					if response.err != nil {
						 return
					}
				case <-ctx.Done():
					return
				}
			}

		}(ctx, server, responses)
	}

	// Consume all responses
	for {
		select {
		case response, ok <-responses:
			if !ok {
				return nil
			}

			// you could also choose to log and
			// continue
			if response.err != nil {
				return response.err
			}	

			stream <- response.response
		case <-ctx.Done():
			return ctx.Err()
		}
	}

	return nil
}

Do you see the bug? Most of the time this code works just fine. When responses are all returned before the context.Context expires, everything goes as planned. When context is cancelled, you leak goroutines and channels.

Imagine the context is cancelled while a producer is here:

select { 
	case response, ok := <-responseChan:
		if !ok {  // all responses received
		        return
		}

		responses <- response  // ***** WAITING TO SEND
		if response.err != nil {
		        return
		}
        case <-ctx.Done():
                return
        }
}

and the consumer is here:

select {  // **** WAITING FOR EITHER CASE
case response, ok <-responses:
	if !ok {
		return nil
	}

	// you could also choose to log and
	// continue
	if response.err != nil {
		return response.err
	}	

	stream <- response.response
case <-ctx.Done():
	return ctx.Err()
}

When the context is cancelled, the consumer exits. All the producers who were waiting to write to responses will wait forever. The channel, the response objects, and other memory associated with the goroutine all leak. Eventually the heap fills up and your program OOMs.

What's wrong?

The trouble with this code is it looks right! We all know you need a way to terminate your select statements. We know you always need to know how your goroutines will terminateknow how your goroutines will terminate.

context.Context seems like a good way to do that! The problem with this buggy code is that there's more than one way the pipeline can shutdown. In the "unhappy path", either side can exit first.

A better version of what's written here is:

func (s *server) MakeRequest(
	ctx context.Context,
	req MyRequestObject,
	stream chan<- MyResponseObject,
) error {
	servers := allStorage()

	type responseMsg struct {
		response MyResponseObject
		err error
	}
	responses := make(chan responseMsg)
	// Producers send to this before exiting
	done := make(chan struct{}{}) 

	// Get each server's responses.
	for _, server := range servers {
		server := server
		go func() {
			defer func() {
				done <- struct{}{}
			}()

			responseChan := server.MakeRequest(ctx, req)
			for {
				select {
				case response, ok := <-responseChan:
					if !ok {  // all responses received
						return
					}

					responses <- response
					if response.err != nil {
						 return
					}
				case <-ctx.Done():
					return
				}
			}
		}()
	}

	finished := 0
	// Consume all responses
	for finished < len(servers) {
		select {
		case response, ok <-responses:
			if !ok {
				return nil
			}

			// You could also choose to exit after this error.
			if response.err != nil {
				logError(response.err)
				continue
			}	

			stream <- response.response
		case <-done:
			finished++
		}
		// Don't select on ctx.Done() - the producers handle cancellation	
	}

	if err := ctx.Err(); err != nil {
		return err
	}
	return nil
}

In this code the pipeline producers are responsible for exiting on cancellation. The consumer doesn't listen for context cancellation - it relies on the producers to write to done when they're finished. Because the servers only write to done when they won't send any more responses, we won't miss any when selecting on the unbuffered responses channel. It's necessary to put the responses read first so that this is guaranteed.

There are also much simpler ways to write this correctly - though with poorer performance:

func MakeRequest(ctx context.Context, request MyRequestObject) ([]MyResponseObject, error) {
	for _, s := range servers {
		responseChan := s.Makerequestuest(req)
		for {
			select {
			case response,ok:= <-responseChan:
				responses = append(responses, response)
				break
			case ctx.Done():
				return nil, ctx.Err()
			}
		}
	}

	return responses, nil
}

Make sure you actually need the parallel processing before you reach for it!

There is also another pattern that you can use to implement fan-out/fan-in pipelines without leaking: On context cancellation, the consumer can read and throw away any responses before returning. This will prevent producers from blocking on a channel send. I prefer the pattern above because it saves writing an additional loop inside the consumer's select.

If I Could Only Tell You Two Things

They would be:

In pipelines, make sure only one end controls exit. This should usually be the input end of the pipeline - the producers in this example. They can handle this by closing a channel or channels or other ways of signalling that work is done. The key here is that the pipeline stops producing new inputs and all the inputs are processed before it exits. Even if it means throwing the inputs away during processing, it's critical that the pipeline is empty.

Don't reach for concurrency unless you need it. Goroutines and channels are tricky to get right, like almost any low-level concurrency primitive. They're powerful, but you have to think through every case where blocking can occur to avoid a deadlock or a leak.

Good luck to you, go build some software!

Interested in joining our team? See our open positions herehere.

September 8, 2020
4 min read
Engineering

Share this article

About the author

Joe Blubaugh

Joe Blubaugh

Read moreRead more

How Queries at Lightstep Work

Brian Lamb | Oct 24, 2022

In this post, we’ll explore how the different stages of a query interact with each other to take your raw data from our data storage layer and aggregate it into useful visualizations.

Learn moreLearn more

Let's Talk About Psychological Safety

Adriana Villela | Jul 12, 2022

System outages are rough for all involved, INCLUDING those who are scrambling to get things up and running again as quickly as possible. Psychological safety is crucial, ensuring that employees are at their best & don't burn out. Read on for more on this.

Learn moreLearn more

OpenTelemetry Python Metrics approaching General Availability

Alex Boten | Apr 1, 2022

OpenTelemetry Metrics is moving towards general availability across the project, including in Python. Learn about how to get started with it today!

Learn moreLearn more
THE CLOUD-NATIVE RELIABILITY PLATFORM

Lightstep sounds like a lovely idea

Monitoring and observability for the world’s most reliable systems