diff options
Diffstat (limited to 'content/pipelines.article')
-rw-r--r-- | content/pipelines.article | 67 |
1 files changed, 34 insertions, 33 deletions
diff --git a/content/pipelines.article b/content/pipelines.article index 194b26c..f2fe186 100644 --- a/content/pipelines.article +++ b/content/pipelines.article @@ -1,26 +1,27 @@ -Go Concurrency Patterns: Pipelines and cancellation +# Go Concurrency Patterns: Pipelines and cancellation 13 Mar 2014 Tags: concurrency, pipelines, cancellation +Summary: Go's concurrency primitives make it easy to construct streaming data pipelines that make efficient use of I/O and multiple CPUs. This article presents examples of such pipelines, highlights subtleties that arise when operations fail, and introduces techniques for dealing with failures cleanly. Sameer Ajmani -* Introduction +## Introduction Go's concurrency primitives make it easy to construct streaming data pipelines that make efficient use of I/O and multiple CPUs. This article presents examples of such pipelines, highlights subtleties that arise when operations fail, and introduces techniques for dealing with failures cleanly. -* What is a pipeline? +## What is a pipeline? There's no formal definition of a pipeline in Go; it's just one of many kinds of concurrent programs. Informally, a pipeline is a series of _stages_ connected by channels, where each stage is a group of goroutines running the same function. In each stage, the goroutines -- receive values from _upstream_ via _inbound_ channels -- perform some function on that data, usually producing new values -- send values _downstream_ via _outbound_ channels + - receive values from _upstream_ via _inbound_ channels + - perform some function on that data, usually producing new values + - send values _downstream_ via _outbound_ channels Each stage has any number of inbound and outbound channels, except the first and last stages, which have only outbound or inbound channels, @@ -30,7 +31,7 @@ _producer_; the last stage, the _sink_ or _consumer_. We'll begin with a simple example pipeline to explain the ideas and techniques. Later, we'll present a more realistic example. -* Squaring numbers +## Squaring numbers Consider a pipeline with three stages. @@ -59,7 +60,7 @@ range loop, like the other stages: .code pipelines/square2.go /func main/,/^}/ -* Fan-out, fan-in +## Fan-out, fan-in Multiple functions can read from the same channel until that channel is closed; this is called _fan-out_. This provides a way to distribute work amongst a group @@ -83,17 +84,17 @@ done. Sends on a closed channel panic, so it's important to ensure all sends are done before calling close. The -[[https://golang.org/pkg/sync/#WaitGroup][`sync.WaitGroup`]] type +[`sync.WaitGroup`](https://golang.org/pkg/sync/#WaitGroup) type provides a simple way to arrange this synchronization: .code pipelines/sqfan.go /func merge/,/^}/ -* Stopping short +## Stopping short There is a pattern to our pipeline functions: -- stages close their outbound channels when all the send operations are done. -- stages keep receiving values from inbound channels until those channels are closed. + - stages close their outbound channels when all the send operations are done. + - stages keep receiving values from inbound channels until those channels are closed. This pattern allows each receiving stage to be written as a `range` loop and ensures that all goroutines exit once all values have been successfully sent @@ -147,7 +148,7 @@ reads any fewer values, we will again have blocked goroutines. Instead, we need to provide a way for downstream stages to indicate to the senders that they will stop accepting input. -* Explicit cancellation +## Explicit cancellation When `main` decides to exit without receiving all the values from `out`, it must tell the goroutines in the upstream stages to abandon @@ -174,7 +175,7 @@ early return. Keeping track of these counts is tedious and error-prone. We need a way to tell an unknown and unbounded number of goroutines to stop sending their values downstream. In Go, we can do this by closing a channel, because -[[https://golang.org/ref/spec#Receive_operator][a receive operation on a closed channel can always proceed immediately, yielding the element type's zero value.]] +[a receive operation on a closed channel can always proceed immediately, yielding the element type's zero value.](https://golang.org/ref/spec#Receive_operator) This means that `main` can unblock all the senders simply by closing the `done` channel. This close is effectively a broadcast signal to @@ -200,14 +201,14 @@ channel is closed on all return paths via a `defer` statement: Here are the guidelines for pipeline construction: -- stages close their outbound channels when all the send operations are done. -- stages keep receiving values from inbound channels until those channels are closed or the senders are unblocked. + - stages close their outbound channels when all the send operations are done. + - stages keep receiving values from inbound channels until those channels are closed or the senders are unblocked. Pipelines unblock senders either by ensuring there's enough buffer for all the values that are sent or by explicitly signalling senders when the receiver may abandon the channel. -* Digesting a tree +## Digesting a tree Let's consider a more realistic pipeline. @@ -234,14 +235,14 @@ returns a map from path name to digest value, then sorts and prints the results: .code pipelines/serial.go /func main/,/^}/ The `MD5All` function is the focus of our discussion. In -[[pipelines/serial.go][serial.go]], the implementation uses no concurrency and +[serial.go](pipelines/serial.go), the implementation uses no concurrency and simply reads and sums each file as it walks the tree. .code pipelines/serial.go /MD5All/,/^}/ -* Parallel digestion +## Parallel digestion -In [[pipelines/parallel.go][parallel.go]], we split `MD5All` into a two-stage +In [parallel.go](pipelines/parallel.go), we split `MD5All` into a two-stage pipeline. The first stage, `sumFiles`, walks the tree, digests each file in a new goroutine, and sends the results on a channel with value type `result`: @@ -259,14 +260,14 @@ closing `done` via a `defer`: .code pipelines/parallel.go /func MD5All/,/^}/ HLdone -* Bounded parallelism +## Bounded parallelism -The `MD5All` implementation in [[pipelines/parallel.go][parallel.go]] +The `MD5All` implementation in [parallel.go](pipelines/parallel.go) starts a new goroutine for each file. In a directory with many large files, this may allocate more memory than is available on the machine. We can limit these allocations by bounding the number of files read in -parallel. In [[pipelines/bounded.go][bounded.go]], we do this by +parallel. In [bounded.go](pipelines/bounded.go), we do this by creating a fixed number of goroutines for reading files. Our pipeline now has three stages: walk the tree, read and digest the files, and collect the digests. @@ -296,7 +297,7 @@ this point, `walkFiles` may block sending values downstream: .code pipelines/bounded.go /m := make/,/^}/ HLerrc -* Conclusion +## Conclusion This article has presented techniques for constructing streaming data pipelines in Go. Dealing with failures in such pipelines is tricky, since each stage in @@ -307,12 +308,12 @@ pipeline and defined guidelines for constructing pipelines correctly. Further reading: -- [[https://talks.golang.org/2012/concurrency.slide#1][Go Concurrency Patterns]] - ([[https://www.youtube.com/watch?v=f6kdp27TYZs][video]]) presents the basics - of Go's concurrency primitives and several ways to apply them. -- [[https://blog.golang.org/advanced-go-concurrency-patterns][Advanced Go Concurrency Patterns]] - ([[http://www.youtube.com/watch?v=QDDwwePbDtw][video]]) covers more complex - uses of Go's primitives, - especially `select`. -- Douglas McIlroy's paper [[https://swtch.com/~rsc/thread/squint.pdf][Squinting at Power Series]] - shows how Go-like concurrency provides elegant support for complex calculations. + - [Go Concurrency Patterns](https://talks.golang.org/2012/concurrency.slide#1) + ([video](https://www.youtube.com/watch?v=f6kdp27TYZs)) presents the basics + of Go's concurrency primitives and several ways to apply them. + - [Advanced Go Concurrency Patterns](https://blog.golang.org/advanced-go-concurrency-patterns) + ([video](http://www.youtube.com/watch?v=QDDwwePbDtw)) covers more complex + uses of Go's primitives, + especially `select`. + - Douglas McIlroy's paper [Squinting at Power Series](https://swtch.com/~rsc/thread/squint.pdf) + shows how Go-like concurrency provides elegant support for complex calculations. |