diff options
-rw-r--r-- | content/pipelines.article | 306 | ||||
-rw-r--r-- | content/pipelines/bounded.go | 121 | ||||
-rw-r--r-- | content/pipelines/parallel.go | 109 | ||||
-rw-r--r-- | content/pipelines/serial.go | 53 | ||||
-rw-r--r-- | content/pipelines/sqbuffer.go | 73 | ||||
-rw-r--r-- | content/pipelines/sqdone1.go | 81 | ||||
-rw-r--r-- | content/pipelines/sqdone2.go | 78 | ||||
-rw-r--r-- | content/pipelines/sqdone3.go | 88 | ||||
-rw-r--r-- | content/pipelines/sqfan.go | 73 | ||||
-rw-r--r-- | content/pipelines/sqleak.go | 75 | ||||
-rw-r--r-- | content/pipelines/square.go | 38 | ||||
-rw-r--r-- | content/pipelines/square2.go | 35 |
12 files changed, 1130 insertions, 0 deletions
diff --git a/content/pipelines.article b/content/pipelines.article new file mode 100644 index 0000000..ab5d1ce --- /dev/null +++ b/content/pipelines.article @@ -0,0 +1,306 @@ +Go Concurrency Patterns: Pipelines and cancellation +13 Mar 2014 +Tags: concurrency, pipelines, cancellation + +Sameer Ajmani + +* Introduction + +Go's concurrency primitives make it easy to construct streaming data pipelines +that make efficient use of I/O and multiple CPUs. This article presents +examples such pipelines, highlights subtleties that arise when operations fail, +and introduces techniques for dealing with failures cleanly. + +* What is a pipeline? + +There's no formal definition of a pipeline in Go; it's just one of many kinds of +concurrent programs. Informally, a pipeline is a series of _stages_ connected +by channels, where each stage is a group of goroutines running the same +function. In each stage, the goroutines + +- receive values from _upstream_ via _inbound_ channels +- perform some function on that data, usually producing new values +- send values _downstream_ via _outbound_ channels + +Each stage has any number of inbound and outbound channels, except the +first and last stages, which have only outbound or inbound channels, +respectively. The first stage is sometimes called the _source_ or +_producer_; the last stage, the _sink_ or _consumer_. + +We'll begin with a simple example pipeline to explain the ideas and techniques. +Later, we'll present a more realistic example. + +* Squaring numbers + +Consider a pipeline with three stages. + +The first stage, `gen`, is a function that converts a list of integers to a +channel that emits the integers in the list. The `gen` function starts a +goroutine that sends the integers on the channel and closes the channel when all +the values have been sent: + +.code pipelines/square.go /func gen/,/^}/ + +The second stage, `sq`, receives integers from a channel and returns a +channel that emits the square of each recevied integer. After the +inbound channel is closed and this stage has sent all the values +downstream, it closes the outbound channel: + +.code pipelines/square.go /func sq/,/^}/ + +The `main` function sets up the pipeline and runs the final stage: it receives +values from the second stage and prints each one, until the channel is closed: + +.code pipelines/square.go /func main/,/^}/ + +Since `sq` has the same type for its inbound and outbound channels, we +can compose it any number of times. We can also rewrite `main` as a +range loop, like the other stages: + +.code pipelines/square2.go /func main/,/^}/ + +* Fan-out, fan-in + +Multiple functions can read from the same channel until that channel is closed; +this is called _fan-out_. This provides a way to distribute work amongst a group +of workers to parallelize CPU use and I/O. + +A function can read from multiple inputs and proceed until all are closed by +multiplexing the input channels onto a single channel that's closed when all the +inputs are closed. This is called _fan-in_. + +We can change our pipeline to run two instances of `sq`, each reading from the +same input channel. We introduce a new function, _merge_, to fan in the +results: + +.code pipelines/sqfan.go /func main/,/^}/ + +The `merge` function converts a list of channels to a single channel by starting +a goroutine for each inbound channel that copies the values to the sole outbound +channel. Once all the `output` goroutines have been started, `merge` starts one +more goroutine to close the outbound channel after all sends on that channel are +done. + +Sends on a closed channel panic, so it's important to ensure all sends +are done before calling close. The +[[http://golang.org/pkg/sync/#WaitGroup][`sync.WaitGroup`]] type +provides a simple way to arrange this synchronization: + +.code pipelines/sqfan.go /func merge/,/^}/ + +* Stopping short + +There is a pattern to our pipeline functions: + +- stages close their outbound channels when all the send operations are done. +- stages keep receiving values from inbound channels until those channels are closed. + +This pattern allows each receving stage to be written as a `range` loop and +ensures that all goroutines exit once all values have been successfully sent +downstream. + +But in real pipelines, stages don't always receive all the inbound +values. Sometimes this is by design: the receiver may only need a +subset of values to make progress. More often, a stage exits early +because an inbound value represents an error in an earlier stage. In +either case the receiver should not have to wait for the remaining +values to arrive, and we want earlier stages to stop producing values +that later stages don't need. + +In our example pipeline, if a stage fails to consume all the inbound values, the +goroutines attempting to send those values will block indefinitely: + +.code pipelines/sqleak.go /first value/,/^}/ + +This is a resource leak: goroutines consume memory and runtime resources, and +heap references in goroutine stacks keep data from being garbage collected. +Goroutines are not garbage collected; they must exit on their own. + +We need to arrange for the upstream stages of our pipeline to exit even when the +downstream stages fail to receive all the inbound values. One way to do this is +to change the outbound channels to have a buffer. A buffer can hold a fixed +number of values; send operations complete immediately if there's room in the +buffer: + + c := make(chan int, 2) // buffer size 2 + c <- 1 // succeeds immediately + c <- 2 // succeeds immediately + c <- 3 // blocks until another goroutine does <-c and receives 1 + +When the number of values to be sent is known at channel creation time, a buffer +can simplify the code. For example, we can rewrite `gen` to copy the list of +integers into a buffered channel and avoid creating a new goroutine: + +.code pipelines/sqbuffer.go /func gen/,/^}/ + +Returning to the blocked goroutines in our pipeline, we might consider adding a +buffer to the outbound channel returned by `merge`: + +.code pipelines/sqbuffer.go /func merge/,/unchanged/ + +While this fixes the blocked goroutine in this program, this is bad code. The +choice of buffer size of 1 here depends on knowing the number of values `merge` +will receive and the number of values downstream stages will consume. This is +fragile: if we pass an additional value to `gen`, or if the downstream stage +reads any fewer values, we will again have blocked goroutines. + +Instead, we need to provide a way for downstream stages to indicate to the +senders that they will stop accepting input. + +* Explicit cancellation + +When `main` decides to exit without receiving all the values from +`out`, it must tell the goroutines in the upstream stages to abandon +the values they're trying it send. It does so by sending values on a +channel called `done`. It sends two values since there are +potentially two blocked senders: + +.code pipelines/sqdone1.go /func main/,/^}/ + +The sending goroutines replace their send operation with a `select` +statement that proceeds either when the send on `out` happens or when +they receive a value from `done`. The value type of `done` is the +empty struct because the value doesn't matter: it is the receive event +that indicates the send on `out` should be abandoned. The `output` +goroutines continue looping on their inbound channel, `c`, so the +upstream stages are not blocked: + +.code pipelines/sqdone1.go /func merge/,/unchanged/ + +But this approach has a problem: the downstream receiver needs to know the +number of potentially blocked upstream senders. Keeping track of these counts +is tedious and error-prone. + +We need a way to tell an unknown and unbounded number of goroutines to +stop sending their values downstream. In Go, we can do this by +closing a channel, because +[[http://golang.org/ref/spec#Receive_operator][a receive operation on a closed channel can always proceed immediately, yielding the element type's zero value.]] + +This means that `main` can unblock all the senders simply by closing +the `done` channel. This close is effectively a broadcast signal to +the senders. We extend each of our pipeline functions to accept +`done` as a parameter and arrange for the close to happen via a +`defer` statement, so that all return paths from `main` will signal +the pipeline stages to exit. + +.code pipelines/sqdone3.go /func main/,/^}/ + +Each of our pipeline stages is now free to return early: `sq` can return from +the middle of its loop since we know that if `done` is closed here, it is also +closed for the upstream `gen` stage. `sq` ensures its `out` channel is closed +on all return paths via a `defer` statement. + +.code pipelines/sqdone3.go /func sq/,/^}/ + +Here are the guidlines for pipeline construction: + +- stages close their outbound channels when all the send operations are done. +- stages keep receiving values from inbound channels until those channels are closed or the senders are unblocked. + +Pipelines unblock senders either by ensuring there's enough buffer for all the +values that are sent or by explicitly signalling senders when the receiver may +abandon the channel. + +* Digesting a tree + +Let's consider a more realistic pipeline. + +MD5 is a message-digest algorithm that's useful as a file checksum. The command +line utility `md5sum` prints digest values for a list of files. + + % md5sum *.go + d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go + ee869afd31f83cbb2d10ee81b2b831dc parallel.go + b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go + +Our example program is like `md5sum` but instead takes a single directory as an +argument and prints the digest values for each regular file under that +directory, sorted by path name. + + % go run serial.go . + d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go + ee869afd31f83cbb2d10ee81b2b831dc parallel.go + b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go + +The main function of our program invokes a helper function `MD5All`, which +returns a map from path name to digest value, then sorts and prints the results: + +.code pipelines/serial.go /func main/,/^}/ + +The `MD5All` function is the focus of our discussion. In +[[pipelines/serial.go][serial.go]], the implementation uses no concurrency and +simply reads and sums each file as it walks the tree. + +.code pipelines/serial.go /MD5All/,/^}/ + +* Parallel digestion + +In [[pipelines/parallel.go][parallel.go]], we split `MD5All` into a two-stage +pipeline. The first stage, `sumFiles`, walks the tree, digests each file in +a new goroutine, and sends the results on a channel with value type `result`: + +.code pipelines/parallel.go /type result/,/}/ HLresult + +`sumFiles` returns two channels: one for the `results` and another for the error +returned by `filepath.Walk`. The walk function starts a new goroutine to +process each regular file, then checks `done`. If `done` is closed, the walk +stops immediately: + +.code pipelines/parallel.go /func sumFiles/,/^}/ + +`MD5All` receives the digest values from `c`. `MD5All` returns early on error, +closing `done` via a `defer`: + +.code pipelines/parallel.go /func MD5All/,/^}/ HLdone + +* Bounded parallelism + +The `MD5All` implementation in [[pipelines/parallel.go][parallel.go]] +starts a new goroutine for each file. In a directory with many large +files, this may allocate more memory than is available on the machine. + +We can limit these allocations by bounding the number of files read in +parallel. In [[pipelines/bounded.go][bounded.go]], we do this by +creating a fixed number of goroutines for reading files. Our pipeline +now has three stages: walk the tree, read and digest the files, and +collect the digests. + +The first stage, `walkFiles`, emits the paths of regular files in the tree: + +.code pipelines/bounded.go /func walkFiles/,/^}/ + +The middle stage starts a fixed number of `digester` goroutines that receive +file names from `paths` and send `results` on channel `c`: + +.code pipelines/bounded.go /func digester/,/^}/ HLpaths + +Unlike our previous examples, `digester` does not close its output channel, as +multiple goroutines are sending on a shared channel. Instead, code in `MD5All` +arranges for the channel to be closed when all the `digesters` are done: + +.code pipelines/bounded.go /fixed number/,/End of pipeline/ HLc + +We could instead have each digester create and return its own output +channel, but then we would need additional goroutines to fan-in the +results. + +The final stage receives all the `results` from `c` then checks the +error from `errc`. This check cannot happen any earlier, since before +this point, `walkFiles` may block sending values downstream: + +.code pipelines/bounded.go /m := make/,/^}/ HLerrc + +* Conclusion + +This article has presented techniques for constructing streaming data pipelines +in Go. Dealing with failures in such pipelines is tricky, since each stage in +the pipeline may block attempting to send values downstream, and the downstream +stages may no longer care about the incoming data. We showed how closing a +channel can broadcast a "done" signal to all the goroutines started by a +pipeline and defined guidelines for constructing pipelines correctly. + +Further reading: + +- [[http://talks.golang.org/2012/concurrency.slide#1][Go Concurrency Patterns]] ([[https://www.youtube.com/watch?v=f6kdp27TYZs][video]]) presents the basics of Go's concurrency primitives and several ways to apply them. +- [[http://blog.golang.org/advanced-go-concurrency-patterns][Advanced Go Concurrency Patterns]] ([[http://www.youtube.com/watch?v=QDDwwePbDtw][video]]) covers more complex uses of Go's primitives, especially `select`. +- Douglas McIlroy's paper [[http://swtch.com/~rsc/thread/squint.pdf][Squinting at Power Series]] shows how Go-like concurrency provides elegant support for complex calculations. diff --git a/content/pipelines/bounded.go b/content/pipelines/bounded.go new file mode 100644 index 0000000..90eed08 --- /dev/null +++ b/content/pipelines/bounded.go @@ -0,0 +1,121 @@ +package main + +import ( + "crypto/md5" + "errors" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + "sync" +) + +// walkFiles starts a goroutine to walk the directory tree at root and send the +// path of each regular file on the string channel. It sends the result of the +// walk on the error channel. If done is closed, walkFiles abandons its work. +func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) { + paths := make(chan string) + errc := make(chan error, 1) + go func() { // HL + // Close the paths channel after Walk returns. + defer close(paths) // HL + // No select needed for this send, since errc is buffered. + errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL + if err != nil { + return err + } + if info.IsDir() { + return nil + } + select { + case paths <- path: // HL + case <-done: // HL + return errors.New("walk canceled") + } + return nil + }) + }() + return paths, errc +} + +// A result is the product of reading and summing a file using MD5. +type result struct { + path string + sum [md5.Size]byte + err error +} + +// digester reads path names from paths and sends digests of the corresponding +// files on c until either paths or done is closed. +func digester(done <-chan struct{}, paths <-chan string, c chan<- result) { + for path := range paths { // HLpaths + data, err := ioutil.ReadFile(path) + select { + case c <- result{path, md5.Sum(data), err}: + case <-done: + return + } + } +} + +// MD5All reads all the files in the file tree rooted at root and returns a map +// from file path to the MD5 sum of the file's contents. If the directory walk +// fails or any read operation fails, MD5All returns an error. In that case, +// MD5All does not wait for inflight read operations to complete. +func MD5All(root string) (map[string][md5.Size]byte, error) { + // MD5All closes the done channel when it returns; it may do so before + // receiving all the values from c and errc. + done := make(chan struct{}) + defer close(done) + + paths, errc := walkFiles(done, root) + + // Start a fixed number of goroutines to read and digest files. + c := make(chan result) // HLc + var wg sync.WaitGroup + const numDigesters = 20 + wg.Add(numDigesters) + for i := 0; i < numDigesters; i++ { + go func() { + digester(done, paths, c) // HLc + wg.Done() + }() + } + go func() { + wg.Wait() + close(c) // HLc + }() + // End of pipeline. OMIT + + m := make(map[string][md5.Size]byte) + for r := range c { + if r.err != nil { + return nil, r.err + } + m[r.path] = r.sum + } + // Check whether the Walk failed. + if err := <-errc; err != nil { // HLerrc + return nil, err + } + return m, nil +} + +func main() { + // Calculate the MD5 sum of all files under the specified directory, + // then print the results sorted by path name. + m, err := MD5All(os.Args[1]) + if err != nil { + fmt.Println(err) + return + } + var paths []string + for path := range m { + paths = append(paths, path) + } + sort.Strings(paths) + for _, path := range paths { + fmt.Printf("%x %s\n", m[path], path) + } +} diff --git a/content/pipelines/parallel.go b/content/pipelines/parallel.go new file mode 100644 index 0000000..7edee1d --- /dev/null +++ b/content/pipelines/parallel.go @@ -0,0 +1,109 @@ +package main + +import ( + "crypto/md5" + "errors" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + "sync" +) + +// A result is the product of reading and summing a file using MD5. +type result struct { + path string + sum [md5.Size]byte + err error +} + +// sumFiles starts goroutines to walk the directory tree at root and digest each +// regular file. These goroutines send the results of the digests on the result +// channel and send the result of the walk on the error channel. If done is +// closed, sumFiles abandons its work. +func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) { + // For each regular file, start a goroutine that sums the file and sends + // the result on c. Send the result of the walk on errc. + c := make(chan result) + errc := make(chan error, 1) + go func() { // HL + var wg sync.WaitGroup + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + wg.Add(1) + go func() { // HL + data, err := ioutil.ReadFile(path) + select { + case c <- result{path, md5.Sum(data), err}: // HL + case <-done: // HL + } + wg.Done() + }() + // Abort the walk if done is closed. + select { + case <-done: // HL + return errors.New("walk canceled") + default: + return nil + } + }) + // Walk has returned, so all calls to wg.Add are done. Start a + // goroutine to close c once all the sends are done. + go func() { // HL + wg.Wait() + close(c) // HL + }() + // No select needed here, since errc is buffered. + errc <- err // HL + }() + return c, errc +} + +// MD5All reads all the files in the file tree rooted at root and returns a map +// from file path to the MD5 sum of the file's contents. If the directory walk +// fails or any read operation fails, MD5All returns an error. In that case, +// MD5All does not wait for inflight read operations to complete. +func MD5All(root string) (map[string][md5.Size]byte, error) { + // MD5All closes the done channel when it returns; it may do so before + // receiving all the values from c and errc. + done := make(chan struct{}) // HLdone + defer close(done) // HLdone + + c, errc := sumFiles(done, root) // HLdone + + m := make(map[string][md5.Size]byte) + for r := range c { // HLrange + if r.err != nil { + return nil, r.err + } + m[r.path] = r.sum + } + if err := <-errc; err != nil { + return nil, err + } + return m, nil +} + +func main() { + // Calculate the MD5 sum of all files under the specified directory, + // then print the results sorted by path name. + m, err := MD5All(os.Args[1]) + if err != nil { + fmt.Println(err) + return + } + var paths []string + for path := range m { + paths = append(paths, path) + } + sort.Strings(paths) + for _, path := range paths { + fmt.Printf("%x %s\n", m[path], path) + } +} diff --git a/content/pipelines/serial.go b/content/pipelines/serial.go new file mode 100644 index 0000000..7a44ca1 --- /dev/null +++ b/content/pipelines/serial.go @@ -0,0 +1,53 @@ +package main + +import ( + "crypto/md5" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" +) + +// MD5All reads all the files in the file tree rooted at root and returns a map +// from file path to the MD5 sum of the file's contents. If the directory walk +// fails or any read operation fails, MD5All returns an error. +func MD5All(root string) (map[string][md5.Size]byte, error) { + m := make(map[string][md5.Size]byte) + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL + if err != nil { + return err + } + if info.IsDir() { + return nil + } + data, err := ioutil.ReadFile(path) // HL + if err != nil { + return err + } + m[path] = md5.Sum(data) // HL + return nil + }) + if err != nil { + return nil, err + } + return m, nil +} + +func main() { + // Calculate the MD5 sum of all files under the specified directory, + // then print the results sorted by path name. + m, err := MD5All(os.Args[1]) // HL + if err != nil { + fmt.Println(err) + return + } + var paths []string + for path := range m { + paths = append(paths, path) + } + sort.Strings(paths) // HL + for _, path := range paths { + fmt.Printf("%x %s\n", m[path], path) + } +} diff --git a/content/pipelines/sqbuffer.go b/content/pipelines/sqbuffer.go new file mode 100644 index 0000000..45e7f6f --- /dev/null +++ b/content/pipelines/sqbuffer.go @@ -0,0 +1,73 @@ +package main + +import ( + "fmt" + "sync" +) + +// gen sends the values in nums on the returned channel, then closes it. +func gen(nums ...int) <-chan int { + out := make(chan int, len(nums)) + for _, n := range nums { + out <- n + } + close(out) + return out +} + +// sq receives values from in, squares them, and sends them on the returned +// channel, until in is closed. Then sq closes the returned channel. +func sq(in <-chan int) <-chan int { + out := make(chan int) + go func() { + for n := range in { + out <- n * n + } + close(out) + }() + return out +} + +// merge receives values from each input channel and sends them on the returned +// channel. merge closes the returned channel after all the input values have +// been sent. +func merge(cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + out := make(chan int, 1) // enough space for the unread inputs + // ... the rest is unchanged ... + + // Start an output goroutine for each input channel in cs. output + // copies values from c to out until c is closed, then calls wg.Done. + output := func(c <-chan int) { + for n := range c { + out <- n + } + wg.Done() + } + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } + + // Start a goroutine to close out once all the output goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() + close(out) + }() + return out +} + +func main() { + in := gen(2, 3) + + // Distribute the sq work across two goroutines that both read from in. + c1 := sq(in) + c2 := sq(in) + + // Consume the first value from output. + out := merge(c1, c2) + fmt.Println(<-out) // 4 or 9 + return + // The second value is sent into out's buffer, and all goroutines exit. +} diff --git a/content/pipelines/sqdone1.go b/content/pipelines/sqdone1.go new file mode 100644 index 0000000..5d47514 --- /dev/null +++ b/content/pipelines/sqdone1.go @@ -0,0 +1,81 @@ +package main + +import ( + "fmt" + "sync" +) + +// gen sends the values in nums on the returned channel, then closes it. +func gen(nums ...int) <-chan int { + out := make(chan int, len(nums)) + for _, n := range nums { + out <- n + } + close(out) + return out +} + +// sq receives values from in, squares them, and sends them on the returned +// channel, until in is closed. Then sq closes the returned channel. +func sq(in <-chan int) <-chan int { + out := make(chan int) + go func() { + for n := range in { + out <- n * n + } + close(out) + }() + return out +} + +// merge receives values from each input channel and sends them on the returned +// channel. merge closes the returned channel after all the input values have +// been sent. +func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + out := make(chan int) + + // Start an output goroutine for each input channel in cs. output + // copies values from c to out until c is closed or it receives a value + // from done, then output calls wg.Done. + output := func(c <-chan int) { + for n := range c { + select { + case out <- n: + case <-done: // HL + } + } + wg.Done() + } + // ... the rest is unchanged ... + + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } + + // Start a goroutine to close out once all the output goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() + close(out) + }() + return out +} + +func main() { + in := gen(2, 3) + + // Distribute the sq work across two goroutines that both read from in. + c1 := sq(in) + c2 := sq(in) + + // Consume the first value from output. + done := make(chan struct{}, 2) // HL + out := merge(done, c1, c2) + fmt.Println(<-out) // 4 or 9 + + // Tell the remaining senders we're leaving. + done <- struct{}{} // HL + done <- struct{}{} // HL +} diff --git a/content/pipelines/sqdone2.go b/content/pipelines/sqdone2.go new file mode 100644 index 0000000..86b2487 --- /dev/null +++ b/content/pipelines/sqdone2.go @@ -0,0 +1,78 @@ +package main + +import ( + "fmt" + "sync" +) + +// gen sends the values in nums on the returned channel, then closes it. +func gen(nums ...int) <-chan int { + out := make(chan int, len(nums)) + for _, n := range nums { + out <- n + } + close(out) + return out +} + +// sq receives values from in, squares them, and sends them on the returned +// channel, until in is closed. Then sq closes the returned channel. +func sq(in <-chan int) <-chan int { + out := make(chan int) + go func() { + for n := range in { + out <- n * n + } + close(out) + }() + return out +} + +// merge receives values from each input channel and sends them on the returned +// channel. merge closes the returned channel after all the input values have +// been sent. +func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + out := make(chan int) + + // Start an output goroutine for each input channel in cs. output + // copies values from c to out until c or done is closed, then calls + // wg.Done. + output := func(c <-chan int) { + for n := range c { + select { + case out <- n: + case <-done: // HL + } + } + wg.Done() + } + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } + + // Start a goroutine to close out once all the output goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() + close(out) + }() + return out +} + +func main() { + in := gen(2, 3) + + // Distribute the sq work across two goroutines that both read from in. + c1 := sq(in) + c2 := sq(in) + + // Consume the first value from output. + done := make(chan struct{}) // HL + out := merge(done, c1, c2) // HL + fmt.Println(<-out) // 4 or 9 + + // Tell the remaining senders we're leaving. + close(done) // HL +} diff --git a/content/pipelines/sqdone3.go b/content/pipelines/sqdone3.go new file mode 100644 index 0000000..63a2269 --- /dev/null +++ b/content/pipelines/sqdone3.go @@ -0,0 +1,88 @@ +package main + +import ( + "fmt" + "sync" +) + +// gen sends the values in nums on the returned channel, then closes it. +func gen(done <-chan struct{}, nums ...int) <-chan int { + out := make(chan int, len(nums)) + for _, n := range nums { + // We ignore done here because these sends cannot block. + out <- n + } + close(out) + return out +} + +// sq receives values from in, squares them, and sends them on the returned +// channel, until in or done is closed. Then sq closes the returned channel. +func sq(done <-chan struct{}, in <-chan int) <-chan int { + out := make(chan int) + go func() { + defer close(out) // HL + for n := range in { + select { + case out <- n * n: + case <-done: + return + } + } + }() + return out +} + +// merge receives values from each input channel and sends them on the returned +// channel. merge closes the returned channel after all the input values have +// been sent or after done is closed. +func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + out := make(chan int) + + // Start an output goroutine for each input channel in cs. output + // copies values from c to out until c or done is closed, then calls + // wg.Done. + output := func(c <-chan int) { + defer wg.Done() + for n := range c { + select { + case out <- n: + case <-done: + return + } + } + } + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } + + // Start a goroutine to close out once all the output goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() + close(out) + }() + return out +} + +func main() { + // Set up a done channel that's shared by the whole pipeline, + // and close that channel when this pipeline exits, as a signal + // for all the goroutines we started to exit. + done := make(chan struct{}) // HL + defer close(done) // HL + + in := gen(done, 2, 3) + + // Distribute the sq work across two goroutines that both read from in. + c1 := sq(done, in) + c2 := sq(done, in) + + // Consume the first value from output. + out := merge(done, c1, c2) + fmt.Println(<-out) // 4 or 9 + + // done will be closed by the deferred call. // HL +} diff --git a/content/pipelines/sqfan.go b/content/pipelines/sqfan.go new file mode 100644 index 0000000..b5702cd --- /dev/null +++ b/content/pipelines/sqfan.go @@ -0,0 +1,73 @@ +package main + +import ( + "fmt" + "sync" +) + +// gen sends the values in nums on the returned channel, then closes it. +func gen(nums ...int) <-chan int { + out := make(chan int) + go func() { + for _, n := range nums { + out <- n + } + close(out) + }() + return out +} + +// sq receives values from in, squares them, and sends them on the returned +// channel, until in is closed. Then sq closes the returned channel. +func sq(in <-chan int) <-chan int { + out := make(chan int) + go func() { + for n := range in { + out <- n * n + } + close(out) + }() + return out +} + +// merge receives values from each input channel and sends them on the returned +// channel. merge closes the returned channel after all the input values have +// been sent. +func merge(cs ...<-chan int) <-chan int { + var wg sync.WaitGroup // HL + out := make(chan int) + + // Start an output goroutine for each input channel in cs. output + // copies values from c to out until c is closed, then calls wg.Done. + output := func(c <-chan int) { + for n := range c { + out <- n + } + wg.Done() // HL + } + wg.Add(len(cs)) // HL + for _, c := range cs { + go output(c) + } + + // Start a goroutine to close out once all the output goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() // HL + close(out) + }() + return out +} + +func main() { + in := gen(2, 3) + + // Distribute the sq work across two goroutines that both read from in. + c1 := sq(in) + c2 := sq(in) + + // Consume the merged output from c1 and c2. + for n := range merge(c1, c2) { + fmt.Println(n) // 4 then 9, or 9 then 4 + } +} diff --git a/content/pipelines/sqleak.go b/content/pipelines/sqleak.go new file mode 100644 index 0000000..a0d22ee --- /dev/null +++ b/content/pipelines/sqleak.go @@ -0,0 +1,75 @@ +package main + +import ( + "fmt" + "sync" +) + +// gen sends the values in nums on the returned channel, then closes it. +func gen(nums ...int) <-chan int { + out := make(chan int) + go func() { + for _, n := range nums { + out <- n + } + close(out) + }() + return out +} + +// sq receives values from in, squares them, and sends them on the returned +// channel, until in is closed. Then sq closes the returned channel. +func sq(in <-chan int) <-chan int { + out := make(chan int) + go func() { + for n := range in { + out <- n * n + } + close(out) + }() + return out +} + +// merge receives values from each input channel and sends them on the returned +// channel. merge closes the returned channel after all the input values have +// been sent. +func merge(cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + out := make(chan int) + + // Start an output goroutine for each input channel in cs. output + // copies values from c to out until c is closed, then calls wg.Done. + output := func(c <-chan int) { + for n := range c { + out <- n + } + wg.Done() + } + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } + + // Start a goroutine to close out once all the output goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() + close(out) + }() + return out +} + +func main() { + in := gen(2, 3) + + // Distribute the sq work across two goroutines that both read from in. + c1 := sq(in) + c2 := sq(in) + + // Consume the first value from output. + out := merge(c1, c2) + fmt.Println(<-out) // 4 or 9 + return + // Since we didn't receive the second value from out, + // one of the output goroutines is hung attempting to send it. +} diff --git a/content/pipelines/square.go b/content/pipelines/square.go new file mode 100644 index 0000000..07c9523 --- /dev/null +++ b/content/pipelines/square.go @@ -0,0 +1,38 @@ +package main + +import "fmt" + +// gen sends the values in nums on the returned channel, then closes it. +func gen(nums ...int) <-chan int { + out := make(chan int) + go func() { + for _, n := range nums { + out <- n + } + close(out) + }() + return out +} + +// sq receives values from in, squares them, and sends them on the returned +// channel, until in is closed. Then sq closes the returned channel. +func sq(in <-chan int) <-chan int { + out := make(chan int) + go func() { + for n := range in { + out <- n * n + } + close(out) + }() + return out +} + +func main() { + // Set up the pipeline. + c := gen(2, 3) + out := sq(c) + + // Consume the output. + fmt.Println(<-out) // 4 + fmt.Println(<-out) // 9 +} diff --git a/content/pipelines/square2.go b/content/pipelines/square2.go new file mode 100644 index 0000000..4530fea --- /dev/null +++ b/content/pipelines/square2.go @@ -0,0 +1,35 @@ +package main + +import "fmt" + +// gen sends the values in nums on the returned channel, then closes it. +func gen(nums ...int) <-chan int { + out := make(chan int) + go func() { + for _, n := range nums { + out <- n + } + close(out) + }() + return out +} + +// sq receives values from in, squares them, and sends them on the returned +// channel, until in is closed. Then sq closes the returned channel. +func sq(in <-chan int) <-chan int { + out := make(chan int) + go func() { + for n := range in { + out <- n * n + } + close(out) + }() + return out +} + +func main() { + // Set up the pipeline and consume the output. + for n := range sq(sq(gen(2, 3))) { + fmt.Println(n) // 16 then 81 + } +} |