diff options
Diffstat (limited to 'content')
-rw-r--r-- | content/pipelines.article | 36 | ||||
-rw-r--r-- | content/pipelines/sqdone3.go | 8 |
2 files changed, 26 insertions, 18 deletions
diff --git a/content/pipelines.article b/content/pipelines.article index 952766f..05772c9 100644 --- a/content/pipelines.article +++ b/content/pipelines.article @@ -157,19 +157,19 @@ potentially two blocked senders: .code pipelines/sqdone1.go /func main/,/^}/ -The sending goroutines replace their send operation with a `select` -statement that proceeds either when the send on `out` happens or when -they receive a value from `done`. The value type of `done` is the -empty struct because the value doesn't matter: it is the receive event -that indicates the send on `out` should be abandoned. The `output` -goroutines continue looping on their inbound channel, `c`, so the -upstream stages are not blocked: +The sending goroutines replace their send operation with a `select` statement +that proceeds either when the send on `out` happens or when they receive a value +from `done`. The value type of `done` is the empty struct because the value +doesn't matter: it is the receive event that indicates the send on `out` should +be abandoned. The `output` goroutines continue looping on their inbound +channel, `c`, so the upstream stages are not blocked. (We'll discuss in a moment +how to allow this loop to return early.) .code pipelines/sqdone1.go /func merge/,/unchanged/ -But this approach has a problem: the downstream receiver needs to know the -number of potentially blocked upstream senders. Keeping track of these counts -is tedious and error-prone. +This approach has a problem: _each_ downstream receiver needs to know the number +of potentially blocked upstream senders and arrange to signal those senders on +early return. Keeping track of these counts is tedious and error-prone. We need a way to tell an unknown and unbounded number of goroutines to stop sending their values downstream. In Go, we can do this by @@ -178,17 +178,23 @@ closing a channel, because This means that `main` can unblock all the senders simply by closing the `done` channel. This close is effectively a broadcast signal to -the senders. We extend each of our pipeline functions to accept +the senders. We extend _each_ of our pipeline functions to accept `done` as a parameter and arrange for the close to happen via a `defer` statement, so that all return paths from `main` will signal the pipeline stages to exit. .code pipelines/sqdone3.go /func main/,/^}/ -Each of our pipeline stages is now free to return early: `sq` can return from -the middle of its loop since we know that if `done` is closed here, it is also -closed for the upstream `gen` stage. `sq` ensures its `out` channel is closed -on all return paths via a `defer` statement. +Each of our pipeline stages is now free to return as soon as `done` is closed. +The `output` routine in `merge` can return without draining its inbound channel, +since it knows the upstream sender, `sq`, will stop attempting to send when +`done` is closed. `output` ensures `wg.Done` is called on all return paths via +a `defer` statement: + +.code pipelines/sqdone3.go /func merge/,/unchanged/ + +Similarly, `sq` can return as soon as `done` is closed. `sq` ensures its `out` +channel is closed on all return paths via a `defer` statement: .code pipelines/sqdone3.go /func sq/,/^}/ diff --git a/content/pipelines/sqdone3.go b/content/pipelines/sqdone3.go index 63a2269..7000744 100644 --- a/content/pipelines/sqdone3.go +++ b/content/pipelines/sqdone3.go @@ -26,7 +26,7 @@ func sq(done <-chan struct{}, in <-chan int) <-chan int { select { case out <- n * n: case <-done: - return + return // HL } } }() @@ -44,15 +44,17 @@ func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { // copies values from c to out until c or done is closed, then calls // wg.Done. output := func(c <-chan int) { - defer wg.Done() + defer wg.Done() // HL for n := range c { select { case out <- n: case <-done: - return + return // HL } } } + // ... the rest is unchanged ... + wg.Add(len(cs)) for _, c := range cs { go output(c) |