aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--content/pipelines.article306
-rw-r--r--content/pipelines/bounded.go121
-rw-r--r--content/pipelines/parallel.go109
-rw-r--r--content/pipelines/serial.go53
-rw-r--r--content/pipelines/sqbuffer.go73
-rw-r--r--content/pipelines/sqdone1.go81
-rw-r--r--content/pipelines/sqdone2.go78
-rw-r--r--content/pipelines/sqdone3.go88
-rw-r--r--content/pipelines/sqfan.go73
-rw-r--r--content/pipelines/sqleak.go75
-rw-r--r--content/pipelines/square.go38
-rw-r--r--content/pipelines/square2.go35
12 files changed, 1130 insertions, 0 deletions
diff --git a/content/pipelines.article b/content/pipelines.article
new file mode 100644
index 0000000..ab5d1ce
--- /dev/null
+++ b/content/pipelines.article
@@ -0,0 +1,306 @@
+Go Concurrency Patterns: Pipelines and cancellation
+13 Mar 2014
+Tags: concurrency, pipelines, cancellation
+
+Sameer Ajmani
+
+* Introduction
+
+Go's concurrency primitives make it easy to construct streaming data pipelines
+that make efficient use of I/O and multiple CPUs. This article presents
+examples such pipelines, highlights subtleties that arise when operations fail,
+and introduces techniques for dealing with failures cleanly.
+
+* What is a pipeline?
+
+There's no formal definition of a pipeline in Go; it's just one of many kinds of
+concurrent programs. Informally, a pipeline is a series of _stages_ connected
+by channels, where each stage is a group of goroutines running the same
+function. In each stage, the goroutines
+
+- receive values from _upstream_ via _inbound_ channels
+- perform some function on that data, usually producing new values
+- send values _downstream_ via _outbound_ channels
+
+Each stage has any number of inbound and outbound channels, except the
+first and last stages, which have only outbound or inbound channels,
+respectively. The first stage is sometimes called the _source_ or
+_producer_; the last stage, the _sink_ or _consumer_.
+
+We'll begin with a simple example pipeline to explain the ideas and techniques.
+Later, we'll present a more realistic example.
+
+* Squaring numbers
+
+Consider a pipeline with three stages.
+
+The first stage, `gen`, is a function that converts a list of integers to a
+channel that emits the integers in the list. The `gen` function starts a
+goroutine that sends the integers on the channel and closes the channel when all
+the values have been sent:
+
+.code pipelines/square.go /func gen/,/^}/
+
+The second stage, `sq`, receives integers from a channel and returns a
+channel that emits the square of each recevied integer. After the
+inbound channel is closed and this stage has sent all the values
+downstream, it closes the outbound channel:
+
+.code pipelines/square.go /func sq/,/^}/
+
+The `main` function sets up the pipeline and runs the final stage: it receives
+values from the second stage and prints each one, until the channel is closed:
+
+.code pipelines/square.go /func main/,/^}/
+
+Since `sq` has the same type for its inbound and outbound channels, we
+can compose it any number of times. We can also rewrite `main` as a
+range loop, like the other stages:
+
+.code pipelines/square2.go /func main/,/^}/
+
+* Fan-out, fan-in
+
+Multiple functions can read from the same channel until that channel is closed;
+this is called _fan-out_. This provides a way to distribute work amongst a group
+of workers to parallelize CPU use and I/O.
+
+A function can read from multiple inputs and proceed until all are closed by
+multiplexing the input channels onto a single channel that's closed when all the
+inputs are closed. This is called _fan-in_.
+
+We can change our pipeline to run two instances of `sq`, each reading from the
+same input channel. We introduce a new function, _merge_, to fan in the
+results:
+
+.code pipelines/sqfan.go /func main/,/^}/
+
+The `merge` function converts a list of channels to a single channel by starting
+a goroutine for each inbound channel that copies the values to the sole outbound
+channel. Once all the `output` goroutines have been started, `merge` starts one
+more goroutine to close the outbound channel after all sends on that channel are
+done.
+
+Sends on a closed channel panic, so it's important to ensure all sends
+are done before calling close. The
+[[http://golang.org/pkg/sync/#WaitGroup][`sync.WaitGroup`]] type
+provides a simple way to arrange this synchronization:
+
+.code pipelines/sqfan.go /func merge/,/^}/
+
+* Stopping short
+
+There is a pattern to our pipeline functions:
+
+- stages close their outbound channels when all the send operations are done.
+- stages keep receiving values from inbound channels until those channels are closed.
+
+This pattern allows each receving stage to be written as a `range` loop and
+ensures that all goroutines exit once all values have been successfully sent
+downstream.
+
+But in real pipelines, stages don't always receive all the inbound
+values. Sometimes this is by design: the receiver may only need a
+subset of values to make progress. More often, a stage exits early
+because an inbound value represents an error in an earlier stage. In
+either case the receiver should not have to wait for the remaining
+values to arrive, and we want earlier stages to stop producing values
+that later stages don't need.
+
+In our example pipeline, if a stage fails to consume all the inbound values, the
+goroutines attempting to send those values will block indefinitely:
+
+.code pipelines/sqleak.go /first value/,/^}/
+
+This is a resource leak: goroutines consume memory and runtime resources, and
+heap references in goroutine stacks keep data from being garbage collected.
+Goroutines are not garbage collected; they must exit on their own.
+
+We need to arrange for the upstream stages of our pipeline to exit even when the
+downstream stages fail to receive all the inbound values. One way to do this is
+to change the outbound channels to have a buffer. A buffer can hold a fixed
+number of values; send operations complete immediately if there's room in the
+buffer:
+
+ c := make(chan int, 2) // buffer size 2
+ c <- 1 // succeeds immediately
+ c <- 2 // succeeds immediately
+ c <- 3 // blocks until another goroutine does <-c and receives 1
+
+When the number of values to be sent is known at channel creation time, a buffer
+can simplify the code. For example, we can rewrite `gen` to copy the list of
+integers into a buffered channel and avoid creating a new goroutine:
+
+.code pipelines/sqbuffer.go /func gen/,/^}/
+
+Returning to the blocked goroutines in our pipeline, we might consider adding a
+buffer to the outbound channel returned by `merge`:
+
+.code pipelines/sqbuffer.go /func merge/,/unchanged/
+
+While this fixes the blocked goroutine in this program, this is bad code. The
+choice of buffer size of 1 here depends on knowing the number of values `merge`
+will receive and the number of values downstream stages will consume. This is
+fragile: if we pass an additional value to `gen`, or if the downstream stage
+reads any fewer values, we will again have blocked goroutines.
+
+Instead, we need to provide a way for downstream stages to indicate to the
+senders that they will stop accepting input.
+
+* Explicit cancellation
+
+When `main` decides to exit without receiving all the values from
+`out`, it must tell the goroutines in the upstream stages to abandon
+the values they're trying it send. It does so by sending values on a
+channel called `done`. It sends two values since there are
+potentially two blocked senders:
+
+.code pipelines/sqdone1.go /func main/,/^}/
+
+The sending goroutines replace their send operation with a `select`
+statement that proceeds either when the send on `out` happens or when
+they receive a value from `done`. The value type of `done` is the
+empty struct because the value doesn't matter: it is the receive event
+that indicates the send on `out` should be abandoned. The `output`
+goroutines continue looping on their inbound channel, `c`, so the
+upstream stages are not blocked:
+
+.code pipelines/sqdone1.go /func merge/,/unchanged/
+
+But this approach has a problem: the downstream receiver needs to know the
+number of potentially blocked upstream senders. Keeping track of these counts
+is tedious and error-prone.
+
+We need a way to tell an unknown and unbounded number of goroutines to
+stop sending their values downstream. In Go, we can do this by
+closing a channel, because
+[[http://golang.org/ref/spec#Receive_operator][a receive operation on a closed channel can always proceed immediately, yielding the element type's zero value.]]
+
+This means that `main` can unblock all the senders simply by closing
+the `done` channel. This close is effectively a broadcast signal to
+the senders. We extend each of our pipeline functions to accept
+`done` as a parameter and arrange for the close to happen via a
+`defer` statement, so that all return paths from `main` will signal
+the pipeline stages to exit.
+
+.code pipelines/sqdone3.go /func main/,/^}/
+
+Each of our pipeline stages is now free to return early: `sq` can return from
+the middle of its loop since we know that if `done` is closed here, it is also
+closed for the upstream `gen` stage. `sq` ensures its `out` channel is closed
+on all return paths via a `defer` statement.
+
+.code pipelines/sqdone3.go /func sq/,/^}/
+
+Here are the guidlines for pipeline construction:
+
+- stages close their outbound channels when all the send operations are done.
+- stages keep receiving values from inbound channels until those channels are closed or the senders are unblocked.
+
+Pipelines unblock senders either by ensuring there's enough buffer for all the
+values that are sent or by explicitly signalling senders when the receiver may
+abandon the channel.
+
+* Digesting a tree
+
+Let's consider a more realistic pipeline.
+
+MD5 is a message-digest algorithm that's useful as a file checksum. The command
+line utility `md5sum` prints digest values for a list of files.
+
+ % md5sum *.go
+ d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
+ ee869afd31f83cbb2d10ee81b2b831dc parallel.go
+ b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
+
+Our example program is like `md5sum` but instead takes a single directory as an
+argument and prints the digest values for each regular file under that
+directory, sorted by path name.
+
+ % go run serial.go .
+ d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
+ ee869afd31f83cbb2d10ee81b2b831dc parallel.go
+ b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
+
+The main function of our program invokes a helper function `MD5All`, which
+returns a map from path name to digest value, then sorts and prints the results:
+
+.code pipelines/serial.go /func main/,/^}/
+
+The `MD5All` function is the focus of our discussion. In
+[[pipelines/serial.go][serial.go]], the implementation uses no concurrency and
+simply reads and sums each file as it walks the tree.
+
+.code pipelines/serial.go /MD5All/,/^}/
+
+* Parallel digestion
+
+In [[pipelines/parallel.go][parallel.go]], we split `MD5All` into a two-stage
+pipeline. The first stage, `sumFiles`, walks the tree, digests each file in
+a new goroutine, and sends the results on a channel with value type `result`:
+
+.code pipelines/parallel.go /type result/,/}/ HLresult
+
+`sumFiles` returns two channels: one for the `results` and another for the error
+returned by `filepath.Walk`. The walk function starts a new goroutine to
+process each regular file, then checks `done`. If `done` is closed, the walk
+stops immediately:
+
+.code pipelines/parallel.go /func sumFiles/,/^}/
+
+`MD5All` receives the digest values from `c`. `MD5All` returns early on error,
+closing `done` via a `defer`:
+
+.code pipelines/parallel.go /func MD5All/,/^}/ HLdone
+
+* Bounded parallelism
+
+The `MD5All` implementation in [[pipelines/parallel.go][parallel.go]]
+starts a new goroutine for each file. In a directory with many large
+files, this may allocate more memory than is available on the machine.
+
+We can limit these allocations by bounding the number of files read in
+parallel. In [[pipelines/bounded.go][bounded.go]], we do this by
+creating a fixed number of goroutines for reading files. Our pipeline
+now has three stages: walk the tree, read and digest the files, and
+collect the digests.
+
+The first stage, `walkFiles`, emits the paths of regular files in the tree:
+
+.code pipelines/bounded.go /func walkFiles/,/^}/
+
+The middle stage starts a fixed number of `digester` goroutines that receive
+file names from `paths` and send `results` on channel `c`:
+
+.code pipelines/bounded.go /func digester/,/^}/ HLpaths
+
+Unlike our previous examples, `digester` does not close its output channel, as
+multiple goroutines are sending on a shared channel. Instead, code in `MD5All`
+arranges for the channel to be closed when all the `digesters` are done:
+
+.code pipelines/bounded.go /fixed number/,/End of pipeline/ HLc
+
+We could instead have each digester create and return its own output
+channel, but then we would need additional goroutines to fan-in the
+results.
+
+The final stage receives all the `results` from `c` then checks the
+error from `errc`. This check cannot happen any earlier, since before
+this point, `walkFiles` may block sending values downstream:
+
+.code pipelines/bounded.go /m := make/,/^}/ HLerrc
+
+* Conclusion
+
+This article has presented techniques for constructing streaming data pipelines
+in Go. Dealing with failures in such pipelines is tricky, since each stage in
+the pipeline may block attempting to send values downstream, and the downstream
+stages may no longer care about the incoming data. We showed how closing a
+channel can broadcast a "done" signal to all the goroutines started by a
+pipeline and defined guidelines for constructing pipelines correctly.
+
+Further reading:
+
+- [[http://talks.golang.org/2012/concurrency.slide#1][Go Concurrency Patterns]] ([[https://www.youtube.com/watch?v=f6kdp27TYZs][video]]) presents the basics of Go's concurrency primitives and several ways to apply them.
+- [[http://blog.golang.org/advanced-go-concurrency-patterns][Advanced Go Concurrency Patterns]] ([[http://www.youtube.com/watch?v=QDDwwePbDtw][video]]) covers more complex uses of Go's primitives, especially `select`.
+- Douglas McIlroy's paper [[http://swtch.com/~rsc/thread/squint.pdf][Squinting at Power Series]] shows how Go-like concurrency provides elegant support for complex calculations.
diff --git a/content/pipelines/bounded.go b/content/pipelines/bounded.go
new file mode 100644
index 0000000..90eed08
--- /dev/null
+++ b/content/pipelines/bounded.go
@@ -0,0 +1,121 @@
+package main
+
+import (
+ "crypto/md5"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "sort"
+ "sync"
+)
+
+// walkFiles starts a goroutine to walk the directory tree at root and send the
+// path of each regular file on the string channel. It sends the result of the
+// walk on the error channel. If done is closed, walkFiles abandons its work.
+func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
+ paths := make(chan string)
+ errc := make(chan error, 1)
+ go func() { // HL
+ // Close the paths channel after Walk returns.
+ defer close(paths) // HL
+ // No select needed for this send, since errc is buffered.
+ errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL
+ if err != nil {
+ return err
+ }
+ if info.IsDir() {
+ return nil
+ }
+ select {
+ case paths <- path: // HL
+ case <-done: // HL
+ return errors.New("walk canceled")
+ }
+ return nil
+ })
+ }()
+ return paths, errc
+}
+
+// A result is the product of reading and summing a file using MD5.
+type result struct {
+ path string
+ sum [md5.Size]byte
+ err error
+}
+
+// digester reads path names from paths and sends digests of the corresponding
+// files on c until either paths or done is closed.
+func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
+ for path := range paths { // HLpaths
+ data, err := ioutil.ReadFile(path)
+ select {
+ case c <- result{path, md5.Sum(data), err}:
+ case <-done:
+ return
+ }
+ }
+}
+
+// MD5All reads all the files in the file tree rooted at root and returns a map
+// from file path to the MD5 sum of the file's contents. If the directory walk
+// fails or any read operation fails, MD5All returns an error. In that case,
+// MD5All does not wait for inflight read operations to complete.
+func MD5All(root string) (map[string][md5.Size]byte, error) {
+ // MD5All closes the done channel when it returns; it may do so before
+ // receiving all the values from c and errc.
+ done := make(chan struct{})
+ defer close(done)
+
+ paths, errc := walkFiles(done, root)
+
+ // Start a fixed number of goroutines to read and digest files.
+ c := make(chan result) // HLc
+ var wg sync.WaitGroup
+ const numDigesters = 20
+ wg.Add(numDigesters)
+ for i := 0; i < numDigesters; i++ {
+ go func() {
+ digester(done, paths, c) // HLc
+ wg.Done()
+ }()
+ }
+ go func() {
+ wg.Wait()
+ close(c) // HLc
+ }()
+ // End of pipeline. OMIT
+
+ m := make(map[string][md5.Size]byte)
+ for r := range c {
+ if r.err != nil {
+ return nil, r.err
+ }
+ m[r.path] = r.sum
+ }
+ // Check whether the Walk failed.
+ if err := <-errc; err != nil { // HLerrc
+ return nil, err
+ }
+ return m, nil
+}
+
+func main() {
+ // Calculate the MD5 sum of all files under the specified directory,
+ // then print the results sorted by path name.
+ m, err := MD5All(os.Args[1])
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ var paths []string
+ for path := range m {
+ paths = append(paths, path)
+ }
+ sort.Strings(paths)
+ for _, path := range paths {
+ fmt.Printf("%x %s\n", m[path], path)
+ }
+}
diff --git a/content/pipelines/parallel.go b/content/pipelines/parallel.go
new file mode 100644
index 0000000..7edee1d
--- /dev/null
+++ b/content/pipelines/parallel.go
@@ -0,0 +1,109 @@
+package main
+
+import (
+ "crypto/md5"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "sort"
+ "sync"
+)
+
+// A result is the product of reading and summing a file using MD5.
+type result struct {
+ path string
+ sum [md5.Size]byte
+ err error
+}
+
+// sumFiles starts goroutines to walk the directory tree at root and digest each
+// regular file. These goroutines send the results of the digests on the result
+// channel and send the result of the walk on the error channel. If done is
+// closed, sumFiles abandons its work.
+func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
+ // For each regular file, start a goroutine that sums the file and sends
+ // the result on c. Send the result of the walk on errc.
+ c := make(chan result)
+ errc := make(chan error, 1)
+ go func() { // HL
+ var wg sync.WaitGroup
+ err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ if info.IsDir() {
+ return nil
+ }
+ wg.Add(1)
+ go func() { // HL
+ data, err := ioutil.ReadFile(path)
+ select {
+ case c <- result{path, md5.Sum(data), err}: // HL
+ case <-done: // HL
+ }
+ wg.Done()
+ }()
+ // Abort the walk if done is closed.
+ select {
+ case <-done: // HL
+ return errors.New("walk canceled")
+ default:
+ return nil
+ }
+ })
+ // Walk has returned, so all calls to wg.Add are done. Start a
+ // goroutine to close c once all the sends are done.
+ go func() { // HL
+ wg.Wait()
+ close(c) // HL
+ }()
+ // No select needed here, since errc is buffered.
+ errc <- err // HL
+ }()
+ return c, errc
+}
+
+// MD5All reads all the files in the file tree rooted at root and returns a map
+// from file path to the MD5 sum of the file's contents. If the directory walk
+// fails or any read operation fails, MD5All returns an error. In that case,
+// MD5All does not wait for inflight read operations to complete.
+func MD5All(root string) (map[string][md5.Size]byte, error) {
+ // MD5All closes the done channel when it returns; it may do so before
+ // receiving all the values from c and errc.
+ done := make(chan struct{}) // HLdone
+ defer close(done) // HLdone
+
+ c, errc := sumFiles(done, root) // HLdone
+
+ m := make(map[string][md5.Size]byte)
+ for r := range c { // HLrange
+ if r.err != nil {
+ return nil, r.err
+ }
+ m[r.path] = r.sum
+ }
+ if err := <-errc; err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+func main() {
+ // Calculate the MD5 sum of all files under the specified directory,
+ // then print the results sorted by path name.
+ m, err := MD5All(os.Args[1])
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ var paths []string
+ for path := range m {
+ paths = append(paths, path)
+ }
+ sort.Strings(paths)
+ for _, path := range paths {
+ fmt.Printf("%x %s\n", m[path], path)
+ }
+}
diff --git a/content/pipelines/serial.go b/content/pipelines/serial.go
new file mode 100644
index 0000000..7a44ca1
--- /dev/null
+++ b/content/pipelines/serial.go
@@ -0,0 +1,53 @@
+package main
+
+import (
+ "crypto/md5"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "sort"
+)
+
+// MD5All reads all the files in the file tree rooted at root and returns a map
+// from file path to the MD5 sum of the file's contents. If the directory walk
+// fails or any read operation fails, MD5All returns an error.
+func MD5All(root string) (map[string][md5.Size]byte, error) {
+ m := make(map[string][md5.Size]byte)
+ err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL
+ if err != nil {
+ return err
+ }
+ if info.IsDir() {
+ return nil
+ }
+ data, err := ioutil.ReadFile(path) // HL
+ if err != nil {
+ return err
+ }
+ m[path] = md5.Sum(data) // HL
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+func main() {
+ // Calculate the MD5 sum of all files under the specified directory,
+ // then print the results sorted by path name.
+ m, err := MD5All(os.Args[1]) // HL
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ var paths []string
+ for path := range m {
+ paths = append(paths, path)
+ }
+ sort.Strings(paths) // HL
+ for _, path := range paths {
+ fmt.Printf("%x %s\n", m[path], path)
+ }
+}
diff --git a/content/pipelines/sqbuffer.go b/content/pipelines/sqbuffer.go
new file mode 100644
index 0000000..45e7f6f
--- /dev/null
+++ b/content/pipelines/sqbuffer.go
@@ -0,0 +1,73 @@
+package main
+
+import (
+ "fmt"
+ "sync"
+)
+
+// gen sends the values in nums on the returned channel, then closes it.
+func gen(nums ...int) <-chan int {
+ out := make(chan int, len(nums))
+ for _, n := range nums {
+ out <- n
+ }
+ close(out)
+ return out
+}
+
+// sq receives values from in, squares them, and sends them on the returned
+// channel, until in is closed. Then sq closes the returned channel.
+func sq(in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for n := range in {
+ out <- n * n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// merge receives values from each input channel and sends them on the returned
+// channel. merge closes the returned channel after all the input values have
+// been sent.
+func merge(cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup
+ out := make(chan int, 1) // enough space for the unread inputs
+ // ... the rest is unchanged ...
+
+ // Start an output goroutine for each input channel in cs. output
+ // copies values from c to out until c is closed, then calls wg.Done.
+ output := func(c <-chan int) {
+ for n := range c {
+ out <- n
+ }
+ wg.Done()
+ }
+ wg.Add(len(cs))
+ for _, c := range cs {
+ go output(c)
+ }
+
+ // Start a goroutine to close out once all the output goroutines are
+ // done. This must start after the wg.Add call.
+ go func() {
+ wg.Wait()
+ close(out)
+ }()
+ return out
+}
+
+func main() {
+ in := gen(2, 3)
+
+ // Distribute the sq work across two goroutines that both read from in.
+ c1 := sq(in)
+ c2 := sq(in)
+
+ // Consume the first value from output.
+ out := merge(c1, c2)
+ fmt.Println(<-out) // 4 or 9
+ return
+ // The second value is sent into out's buffer, and all goroutines exit.
+}
diff --git a/content/pipelines/sqdone1.go b/content/pipelines/sqdone1.go
new file mode 100644
index 0000000..5d47514
--- /dev/null
+++ b/content/pipelines/sqdone1.go
@@ -0,0 +1,81 @@
+package main
+
+import (
+ "fmt"
+ "sync"
+)
+
+// gen sends the values in nums on the returned channel, then closes it.
+func gen(nums ...int) <-chan int {
+ out := make(chan int, len(nums))
+ for _, n := range nums {
+ out <- n
+ }
+ close(out)
+ return out
+}
+
+// sq receives values from in, squares them, and sends them on the returned
+// channel, until in is closed. Then sq closes the returned channel.
+func sq(in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for n := range in {
+ out <- n * n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// merge receives values from each input channel and sends them on the returned
+// channel. merge closes the returned channel after all the input values have
+// been sent.
+func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup
+ out := make(chan int)
+
+ // Start an output goroutine for each input channel in cs. output
+ // copies values from c to out until c is closed or it receives a value
+ // from done, then output calls wg.Done.
+ output := func(c <-chan int) {
+ for n := range c {
+ select {
+ case out <- n:
+ case <-done: // HL
+ }
+ }
+ wg.Done()
+ }
+ // ... the rest is unchanged ...
+
+ wg.Add(len(cs))
+ for _, c := range cs {
+ go output(c)
+ }
+
+ // Start a goroutine to close out once all the output goroutines are
+ // done. This must start after the wg.Add call.
+ go func() {
+ wg.Wait()
+ close(out)
+ }()
+ return out
+}
+
+func main() {
+ in := gen(2, 3)
+
+ // Distribute the sq work across two goroutines that both read from in.
+ c1 := sq(in)
+ c2 := sq(in)
+
+ // Consume the first value from output.
+ done := make(chan struct{}, 2) // HL
+ out := merge(done, c1, c2)
+ fmt.Println(<-out) // 4 or 9
+
+ // Tell the remaining senders we're leaving.
+ done <- struct{}{} // HL
+ done <- struct{}{} // HL
+}
diff --git a/content/pipelines/sqdone2.go b/content/pipelines/sqdone2.go
new file mode 100644
index 0000000..86b2487
--- /dev/null
+++ b/content/pipelines/sqdone2.go
@@ -0,0 +1,78 @@
+package main
+
+import (
+ "fmt"
+ "sync"
+)
+
+// gen sends the values in nums on the returned channel, then closes it.
+func gen(nums ...int) <-chan int {
+ out := make(chan int, len(nums))
+ for _, n := range nums {
+ out <- n
+ }
+ close(out)
+ return out
+}
+
+// sq receives values from in, squares them, and sends them on the returned
+// channel, until in is closed. Then sq closes the returned channel.
+func sq(in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for n := range in {
+ out <- n * n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// merge receives values from each input channel and sends them on the returned
+// channel. merge closes the returned channel after all the input values have
+// been sent.
+func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup
+ out := make(chan int)
+
+ // Start an output goroutine for each input channel in cs. output
+ // copies values from c to out until c or done is closed, then calls
+ // wg.Done.
+ output := func(c <-chan int) {
+ for n := range c {
+ select {
+ case out <- n:
+ case <-done: // HL
+ }
+ }
+ wg.Done()
+ }
+ wg.Add(len(cs))
+ for _, c := range cs {
+ go output(c)
+ }
+
+ // Start a goroutine to close out once all the output goroutines are
+ // done. This must start after the wg.Add call.
+ go func() {
+ wg.Wait()
+ close(out)
+ }()
+ return out
+}
+
+func main() {
+ in := gen(2, 3)
+
+ // Distribute the sq work across two goroutines that both read from in.
+ c1 := sq(in)
+ c2 := sq(in)
+
+ // Consume the first value from output.
+ done := make(chan struct{}) // HL
+ out := merge(done, c1, c2) // HL
+ fmt.Println(<-out) // 4 or 9
+
+ // Tell the remaining senders we're leaving.
+ close(done) // HL
+}
diff --git a/content/pipelines/sqdone3.go b/content/pipelines/sqdone3.go
new file mode 100644
index 0000000..63a2269
--- /dev/null
+++ b/content/pipelines/sqdone3.go
@@ -0,0 +1,88 @@
+package main
+
+import (
+ "fmt"
+ "sync"
+)
+
+// gen sends the values in nums on the returned channel, then closes it.
+func gen(done <-chan struct{}, nums ...int) <-chan int {
+ out := make(chan int, len(nums))
+ for _, n := range nums {
+ // We ignore done here because these sends cannot block.
+ out <- n
+ }
+ close(out)
+ return out
+}
+
+// sq receives values from in, squares them, and sends them on the returned
+// channel, until in or done is closed. Then sq closes the returned channel.
+func sq(done <-chan struct{}, in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ defer close(out) // HL
+ for n := range in {
+ select {
+ case out <- n * n:
+ case <-done:
+ return
+ }
+ }
+ }()
+ return out
+}
+
+// merge receives values from each input channel and sends them on the returned
+// channel. merge closes the returned channel after all the input values have
+// been sent or after done is closed.
+func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup
+ out := make(chan int)
+
+ // Start an output goroutine for each input channel in cs. output
+ // copies values from c to out until c or done is closed, then calls
+ // wg.Done.
+ output := func(c <-chan int) {
+ defer wg.Done()
+ for n := range c {
+ select {
+ case out <- n:
+ case <-done:
+ return
+ }
+ }
+ }
+ wg.Add(len(cs))
+ for _, c := range cs {
+ go output(c)
+ }
+
+ // Start a goroutine to close out once all the output goroutines are
+ // done. This must start after the wg.Add call.
+ go func() {
+ wg.Wait()
+ close(out)
+ }()
+ return out
+}
+
+func main() {
+ // Set up a done channel that's shared by the whole pipeline,
+ // and close that channel when this pipeline exits, as a signal
+ // for all the goroutines we started to exit.
+ done := make(chan struct{}) // HL
+ defer close(done) // HL
+
+ in := gen(done, 2, 3)
+
+ // Distribute the sq work across two goroutines that both read from in.
+ c1 := sq(done, in)
+ c2 := sq(done, in)
+
+ // Consume the first value from output.
+ out := merge(done, c1, c2)
+ fmt.Println(<-out) // 4 or 9
+
+ // done will be closed by the deferred call. // HL
+}
diff --git a/content/pipelines/sqfan.go b/content/pipelines/sqfan.go
new file mode 100644
index 0000000..b5702cd
--- /dev/null
+++ b/content/pipelines/sqfan.go
@@ -0,0 +1,73 @@
+package main
+
+import (
+ "fmt"
+ "sync"
+)
+
+// gen sends the values in nums on the returned channel, then closes it.
+func gen(nums ...int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for _, n := range nums {
+ out <- n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// sq receives values from in, squares them, and sends them on the returned
+// channel, until in is closed. Then sq closes the returned channel.
+func sq(in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for n := range in {
+ out <- n * n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// merge receives values from each input channel and sends them on the returned
+// channel. merge closes the returned channel after all the input values have
+// been sent.
+func merge(cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup // HL
+ out := make(chan int)
+
+ // Start an output goroutine for each input channel in cs. output
+ // copies values from c to out until c is closed, then calls wg.Done.
+ output := func(c <-chan int) {
+ for n := range c {
+ out <- n
+ }
+ wg.Done() // HL
+ }
+ wg.Add(len(cs)) // HL
+ for _, c := range cs {
+ go output(c)
+ }
+
+ // Start a goroutine to close out once all the output goroutines are
+ // done. This must start after the wg.Add call.
+ go func() {
+ wg.Wait() // HL
+ close(out)
+ }()
+ return out
+}
+
+func main() {
+ in := gen(2, 3)
+
+ // Distribute the sq work across two goroutines that both read from in.
+ c1 := sq(in)
+ c2 := sq(in)
+
+ // Consume the merged output from c1 and c2.
+ for n := range merge(c1, c2) {
+ fmt.Println(n) // 4 then 9, or 9 then 4
+ }
+}
diff --git a/content/pipelines/sqleak.go b/content/pipelines/sqleak.go
new file mode 100644
index 0000000..a0d22ee
--- /dev/null
+++ b/content/pipelines/sqleak.go
@@ -0,0 +1,75 @@
+package main
+
+import (
+ "fmt"
+ "sync"
+)
+
+// gen sends the values in nums on the returned channel, then closes it.
+func gen(nums ...int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for _, n := range nums {
+ out <- n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// sq receives values from in, squares them, and sends them on the returned
+// channel, until in is closed. Then sq closes the returned channel.
+func sq(in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for n := range in {
+ out <- n * n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// merge receives values from each input channel and sends them on the returned
+// channel. merge closes the returned channel after all the input values have
+// been sent.
+func merge(cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup
+ out := make(chan int)
+
+ // Start an output goroutine for each input channel in cs. output
+ // copies values from c to out until c is closed, then calls wg.Done.
+ output := func(c <-chan int) {
+ for n := range c {
+ out <- n
+ }
+ wg.Done()
+ }
+ wg.Add(len(cs))
+ for _, c := range cs {
+ go output(c)
+ }
+
+ // Start a goroutine to close out once all the output goroutines are
+ // done. This must start after the wg.Add call.
+ go func() {
+ wg.Wait()
+ close(out)
+ }()
+ return out
+}
+
+func main() {
+ in := gen(2, 3)
+
+ // Distribute the sq work across two goroutines that both read from in.
+ c1 := sq(in)
+ c2 := sq(in)
+
+ // Consume the first value from output.
+ out := merge(c1, c2)
+ fmt.Println(<-out) // 4 or 9
+ return
+ // Since we didn't receive the second value from out,
+ // one of the output goroutines is hung attempting to send it.
+}
diff --git a/content/pipelines/square.go b/content/pipelines/square.go
new file mode 100644
index 0000000..07c9523
--- /dev/null
+++ b/content/pipelines/square.go
@@ -0,0 +1,38 @@
+package main
+
+import "fmt"
+
+// gen sends the values in nums on the returned channel, then closes it.
+func gen(nums ...int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for _, n := range nums {
+ out <- n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// sq receives values from in, squares them, and sends them on the returned
+// channel, until in is closed. Then sq closes the returned channel.
+func sq(in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for n := range in {
+ out <- n * n
+ }
+ close(out)
+ }()
+ return out
+}
+
+func main() {
+ // Set up the pipeline.
+ c := gen(2, 3)
+ out := sq(c)
+
+ // Consume the output.
+ fmt.Println(<-out) // 4
+ fmt.Println(<-out) // 9
+}
diff --git a/content/pipelines/square2.go b/content/pipelines/square2.go
new file mode 100644
index 0000000..4530fea
--- /dev/null
+++ b/content/pipelines/square2.go
@@ -0,0 +1,35 @@
+package main
+
+import "fmt"
+
+// gen sends the values in nums on the returned channel, then closes it.
+func gen(nums ...int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for _, n := range nums {
+ out <- n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// sq receives values from in, squares them, and sends them on the returned
+// channel, until in is closed. Then sq closes the returned channel.
+func sq(in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for n := range in {
+ out <- n * n
+ }
+ close(out)
+ }()
+ return out
+}
+
+func main() {
+ // Set up the pipeline and consume the output.
+ for n := range sq(sq(gen(2, 3))) {
+ fmt.Println(n) // 16 then 81
+ }
+}