aboutsummaryrefslogtreecommitdiff
path: root/content
diff options
context:
space:
mode:
Diffstat (limited to 'content')
-rw-r--r--content/pipelines.article36
-rw-r--r--content/pipelines/sqdone3.go8
2 files changed, 26 insertions, 18 deletions
diff --git a/content/pipelines.article b/content/pipelines.article
index 952766f..05772c9 100644
--- a/content/pipelines.article
+++ b/content/pipelines.article
@@ -157,19 +157,19 @@ potentially two blocked senders:
.code pipelines/sqdone1.go /func main/,/^}/
-The sending goroutines replace their send operation with a `select`
-statement that proceeds either when the send on `out` happens or when
-they receive a value from `done`. The value type of `done` is the
-empty struct because the value doesn't matter: it is the receive event
-that indicates the send on `out` should be abandoned. The `output`
-goroutines continue looping on their inbound channel, `c`, so the
-upstream stages are not blocked:
+The sending goroutines replace their send operation with a `select` statement
+that proceeds either when the send on `out` happens or when they receive a value
+from `done`. The value type of `done` is the empty struct because the value
+doesn't matter: it is the receive event that indicates the send on `out` should
+be abandoned. The `output` goroutines continue looping on their inbound
+channel, `c`, so the upstream stages are not blocked. (We'll discuss in a moment
+how to allow this loop to return early.)
.code pipelines/sqdone1.go /func merge/,/unchanged/
-But this approach has a problem: the downstream receiver needs to know the
-number of potentially blocked upstream senders. Keeping track of these counts
-is tedious and error-prone.
+This approach has a problem: _each_ downstream receiver needs to know the number
+of potentially blocked upstream senders and arrange to signal those senders on
+early return. Keeping track of these counts is tedious and error-prone.
We need a way to tell an unknown and unbounded number of goroutines to
stop sending their values downstream. In Go, we can do this by
@@ -178,17 +178,23 @@ closing a channel, because
This means that `main` can unblock all the senders simply by closing
the `done` channel. This close is effectively a broadcast signal to
-the senders. We extend each of our pipeline functions to accept
+the senders. We extend _each_ of our pipeline functions to accept
`done` as a parameter and arrange for the close to happen via a
`defer` statement, so that all return paths from `main` will signal
the pipeline stages to exit.
.code pipelines/sqdone3.go /func main/,/^}/
-Each of our pipeline stages is now free to return early: `sq` can return from
-the middle of its loop since we know that if `done` is closed here, it is also
-closed for the upstream `gen` stage. `sq` ensures its `out` channel is closed
-on all return paths via a `defer` statement.
+Each of our pipeline stages is now free to return as soon as `done` is closed.
+The `output` routine in `merge` can return without draining its inbound channel,
+since it knows the upstream sender, `sq`, will stop attempting to send when
+`done` is closed. `output` ensures `wg.Done` is called on all return paths via
+a `defer` statement:
+
+.code pipelines/sqdone3.go /func merge/,/unchanged/
+
+Similarly, `sq` can return as soon as `done` is closed. `sq` ensures its `out`
+channel is closed on all return paths via a `defer` statement:
.code pipelines/sqdone3.go /func sq/,/^}/
diff --git a/content/pipelines/sqdone3.go b/content/pipelines/sqdone3.go
index 63a2269..7000744 100644
--- a/content/pipelines/sqdone3.go
+++ b/content/pipelines/sqdone3.go
@@ -26,7 +26,7 @@ func sq(done <-chan struct{}, in <-chan int) <-chan int {
select {
case out <- n * n:
case <-done:
- return
+ return // HL
}
}
}()
@@ -44,15 +44,17 @@ func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan int) {
- defer wg.Done()
+ defer wg.Done() // HL
for n := range c {
select {
case out <- n:
case <-done:
- return
+ return // HL
}
}
}
+ // ... the rest is unchanged ...
+
wg.Add(len(cs))
for _, c := range cs {
go output(c)