aboutsummaryrefslogtreecommitdiff
path: root/content/pipelines.article
diff options
context:
space:
mode:
Diffstat (limited to 'content/pipelines.article')
-rw-r--r--content/pipelines.article67
1 files changed, 34 insertions, 33 deletions
diff --git a/content/pipelines.article b/content/pipelines.article
index 194b26c..f2fe186 100644
--- a/content/pipelines.article
+++ b/content/pipelines.article
@@ -1,26 +1,27 @@
-Go Concurrency Patterns: Pipelines and cancellation
+# Go Concurrency Patterns: Pipelines and cancellation
13 Mar 2014
Tags: concurrency, pipelines, cancellation
+Summary: Go's concurrency primitives make it easy to construct streaming data pipelines that make efficient use of I/O and multiple CPUs. This article presents examples of such pipelines, highlights subtleties that arise when operations fail, and introduces techniques for dealing with failures cleanly.
Sameer Ajmani
-* Introduction
+## Introduction
Go's concurrency primitives make it easy to construct streaming data pipelines
that make efficient use of I/O and multiple CPUs. This article presents
examples of such pipelines, highlights subtleties that arise when operations
fail, and introduces techniques for dealing with failures cleanly.
-* What is a pipeline?
+## What is a pipeline?
There's no formal definition of a pipeline in Go; it's just one of many kinds of
concurrent programs. Informally, a pipeline is a series of _stages_ connected
by channels, where each stage is a group of goroutines running the same
function. In each stage, the goroutines
-- receive values from _upstream_ via _inbound_ channels
-- perform some function on that data, usually producing new values
-- send values _downstream_ via _outbound_ channels
+ - receive values from _upstream_ via _inbound_ channels
+ - perform some function on that data, usually producing new values
+ - send values _downstream_ via _outbound_ channels
Each stage has any number of inbound and outbound channels, except the
first and last stages, which have only outbound or inbound channels,
@@ -30,7 +31,7 @@ _producer_; the last stage, the _sink_ or _consumer_.
We'll begin with a simple example pipeline to explain the ideas and techniques.
Later, we'll present a more realistic example.
-* Squaring numbers
+## Squaring numbers
Consider a pipeline with three stages.
@@ -59,7 +60,7 @@ range loop, like the other stages:
.code pipelines/square2.go /func main/,/^}/
-* Fan-out, fan-in
+## Fan-out, fan-in
Multiple functions can read from the same channel until that channel is closed;
this is called _fan-out_. This provides a way to distribute work amongst a group
@@ -83,17 +84,17 @@ done.
Sends on a closed channel panic, so it's important to ensure all sends
are done before calling close. The
-[[https://golang.org/pkg/sync/#WaitGroup][`sync.WaitGroup`]] type
+[`sync.WaitGroup`](https://golang.org/pkg/sync/#WaitGroup) type
provides a simple way to arrange this synchronization:
.code pipelines/sqfan.go /func merge/,/^}/
-* Stopping short
+## Stopping short
There is a pattern to our pipeline functions:
-- stages close their outbound channels when all the send operations are done.
-- stages keep receiving values from inbound channels until those channels are closed.
+ - stages close their outbound channels when all the send operations are done.
+ - stages keep receiving values from inbound channels until those channels are closed.
This pattern allows each receiving stage to be written as a `range` loop and
ensures that all goroutines exit once all values have been successfully sent
@@ -147,7 +148,7 @@ reads any fewer values, we will again have blocked goroutines.
Instead, we need to provide a way for downstream stages to indicate to the
senders that they will stop accepting input.
-* Explicit cancellation
+## Explicit cancellation
When `main` decides to exit without receiving all the values from
`out`, it must tell the goroutines in the upstream stages to abandon
@@ -174,7 +175,7 @@ early return. Keeping track of these counts is tedious and error-prone.
We need a way to tell an unknown and unbounded number of goroutines to
stop sending their values downstream. In Go, we can do this by
closing a channel, because
-[[https://golang.org/ref/spec#Receive_operator][a receive operation on a closed channel can always proceed immediately, yielding the element type's zero value.]]
+[a receive operation on a closed channel can always proceed immediately, yielding the element type's zero value.](https://golang.org/ref/spec#Receive_operator)
This means that `main` can unblock all the senders simply by closing
the `done` channel. This close is effectively a broadcast signal to
@@ -200,14 +201,14 @@ channel is closed on all return paths via a `defer` statement:
Here are the guidelines for pipeline construction:
-- stages close their outbound channels when all the send operations are done.
-- stages keep receiving values from inbound channels until those channels are closed or the senders are unblocked.
+ - stages close their outbound channels when all the send operations are done.
+ - stages keep receiving values from inbound channels until those channels are closed or the senders are unblocked.
Pipelines unblock senders either by ensuring there's enough buffer for all the
values that are sent or by explicitly signalling senders when the receiver may
abandon the channel.
-* Digesting a tree
+## Digesting a tree
Let's consider a more realistic pipeline.
@@ -234,14 +235,14 @@ returns a map from path name to digest value, then sorts and prints the results:
.code pipelines/serial.go /func main/,/^}/
The `MD5All` function is the focus of our discussion. In
-[[pipelines/serial.go][serial.go]], the implementation uses no concurrency and
+[serial.go](pipelines/serial.go), the implementation uses no concurrency and
simply reads and sums each file as it walks the tree.
.code pipelines/serial.go /MD5All/,/^}/
-* Parallel digestion
+## Parallel digestion
-In [[pipelines/parallel.go][parallel.go]], we split `MD5All` into a two-stage
+In [parallel.go](pipelines/parallel.go), we split `MD5All` into a two-stage
pipeline. The first stage, `sumFiles`, walks the tree, digests each file in
a new goroutine, and sends the results on a channel with value type `result`:
@@ -259,14 +260,14 @@ closing `done` via a `defer`:
.code pipelines/parallel.go /func MD5All/,/^}/ HLdone
-* Bounded parallelism
+## Bounded parallelism
-The `MD5All` implementation in [[pipelines/parallel.go][parallel.go]]
+The `MD5All` implementation in [parallel.go](pipelines/parallel.go)
starts a new goroutine for each file. In a directory with many large
files, this may allocate more memory than is available on the machine.
We can limit these allocations by bounding the number of files read in
-parallel. In [[pipelines/bounded.go][bounded.go]], we do this by
+parallel. In [bounded.go](pipelines/bounded.go), we do this by
creating a fixed number of goroutines for reading files. Our pipeline
now has three stages: walk the tree, read and digest the files, and
collect the digests.
@@ -296,7 +297,7 @@ this point, `walkFiles` may block sending values downstream:
.code pipelines/bounded.go /m := make/,/^}/ HLerrc
-* Conclusion
+## Conclusion
This article has presented techniques for constructing streaming data pipelines
in Go. Dealing with failures in such pipelines is tricky, since each stage in
@@ -307,12 +308,12 @@ pipeline and defined guidelines for constructing pipelines correctly.
Further reading:
-- [[https://talks.golang.org/2012/concurrency.slide#1][Go Concurrency Patterns]]
- ([[https://www.youtube.com/watch?v=f6kdp27TYZs][video]]) presents the basics
- of Go's concurrency primitives and several ways to apply them.
-- [[https://blog.golang.org/advanced-go-concurrency-patterns][Advanced Go Concurrency Patterns]]
- ([[http://www.youtube.com/watch?v=QDDwwePbDtw][video]]) covers more complex
- uses of Go's primitives,
- especially `select`.
-- Douglas McIlroy's paper [[https://swtch.com/~rsc/thread/squint.pdf][Squinting at Power Series]]
- shows how Go-like concurrency provides elegant support for complex calculations.
+ - [Go Concurrency Patterns](https://talks.golang.org/2012/concurrency.slide#1)
+ ([video](https://www.youtube.com/watch?v=f6kdp27TYZs)) presents the basics
+ of Go's concurrency primitives and several ways to apply them.
+ - [Advanced Go Concurrency Patterns](https://blog.golang.org/advanced-go-concurrency-patterns)
+ ([video](http://www.youtube.com/watch?v=QDDwwePbDtw)) covers more complex
+ uses of Go's primitives,
+ especially `select`.
+ - Douglas McIlroy's paper [Squinting at Power Series](https://swtch.com/~rsc/thread/squint.pdf)
+ shows how Go-like concurrency provides elegant support for complex calculations.