aboutsummaryrefslogtreecommitdiff
path: root/content/pipelines/sqbuffer.go
diff options
context:
space:
mode:
authorSameer Ajmani <sameer@golang.org>2014-03-12 23:43:55 -0400
committerSameer Ajmani <sameer@golang.org>2014-03-12 23:43:55 -0400
commit9aa2e1143c55cfa6a0b8651067b34c61da8d5646 (patch)
treef0eb469d9b435c085c77a943225c9eef72d84eb3 /content/pipelines/sqbuffer.go
parent401e4f4a7cd251187c7e3b18b1dc2c583e6d2e45 (diff)
go.blog: pipelines and cancellation.
R=adg, r, rsc, ken, bcmills CC=adonovan, campoy, david.crawshaw, gmlewis, golang-codereviews https://golang.org/cl/71070047
Diffstat (limited to 'content/pipelines/sqbuffer.go')
-rw-r--r--content/pipelines/sqbuffer.go73
1 files changed, 73 insertions, 0 deletions
diff --git a/content/pipelines/sqbuffer.go b/content/pipelines/sqbuffer.go
new file mode 100644
index 0000000..45e7f6f
--- /dev/null
+++ b/content/pipelines/sqbuffer.go
@@ -0,0 +1,73 @@
+package main
+
+import (
+ "fmt"
+ "sync"
+)
+
+// gen sends the values in nums on the returned channel, then closes it.
+func gen(nums ...int) <-chan int {
+ out := make(chan int, len(nums))
+ for _, n := range nums {
+ out <- n
+ }
+ close(out)
+ return out
+}
+
+// sq receives values from in, squares them, and sends them on the returned
+// channel, until in is closed. Then sq closes the returned channel.
+func sq(in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for n := range in {
+ out <- n * n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// merge receives values from each input channel and sends them on the returned
+// channel. merge closes the returned channel after all the input values have
+// been sent.
+func merge(cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup
+ out := make(chan int, 1) // enough space for the unread inputs
+ // ... the rest is unchanged ...
+
+ // Start an output goroutine for each input channel in cs. output
+ // copies values from c to out until c is closed, then calls wg.Done.
+ output := func(c <-chan int) {
+ for n := range c {
+ out <- n
+ }
+ wg.Done()
+ }
+ wg.Add(len(cs))
+ for _, c := range cs {
+ go output(c)
+ }
+
+ // Start a goroutine to close out once all the output goroutines are
+ // done. This must start after the wg.Add call.
+ go func() {
+ wg.Wait()
+ close(out)
+ }()
+ return out
+}
+
+func main() {
+ in := gen(2, 3)
+
+ // Distribute the sq work across two goroutines that both read from in.
+ c1 := sq(in)
+ c2 := sq(in)
+
+ // Consume the first value from output.
+ out := merge(c1, c2)
+ fmt.Println(<-out) // 4 or 9
+ return
+ // The second value is sent into out's buffer, and all goroutines exit.
+}