aboutsummaryrefslogtreecommitdiff
path: root/content/pipelines
diff options
context:
space:
mode:
authorSameer Ajmani <sameer@golang.org>2014-03-12 23:43:55 -0400
committerSameer Ajmani <sameer@golang.org>2014-03-12 23:43:55 -0400
commit9aa2e1143c55cfa6a0b8651067b34c61da8d5646 (patch)
treef0eb469d9b435c085c77a943225c9eef72d84eb3 /content/pipelines
parent401e4f4a7cd251187c7e3b18b1dc2c583e6d2e45 (diff)
go.blog: pipelines and cancellation.
R=adg, r, rsc, ken, bcmills CC=adonovan, campoy, david.crawshaw, gmlewis, golang-codereviews https://golang.org/cl/71070047
Diffstat (limited to 'content/pipelines')
-rw-r--r--content/pipelines/bounded.go121
-rw-r--r--content/pipelines/parallel.go109
-rw-r--r--content/pipelines/serial.go53
-rw-r--r--content/pipelines/sqbuffer.go73
-rw-r--r--content/pipelines/sqdone1.go81
-rw-r--r--content/pipelines/sqdone2.go78
-rw-r--r--content/pipelines/sqdone3.go88
-rw-r--r--content/pipelines/sqfan.go73
-rw-r--r--content/pipelines/sqleak.go75
-rw-r--r--content/pipelines/square.go38
-rw-r--r--content/pipelines/square2.go35
11 files changed, 824 insertions, 0 deletions
diff --git a/content/pipelines/bounded.go b/content/pipelines/bounded.go
new file mode 100644
index 0000000..90eed08
--- /dev/null
+++ b/content/pipelines/bounded.go
@@ -0,0 +1,121 @@
+package main
+
+import (
+ "crypto/md5"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "sort"
+ "sync"
+)
+
+// walkFiles starts a goroutine to walk the directory tree at root and send the
+// path of each regular file on the string channel. It sends the result of the
+// walk on the error channel. If done is closed, walkFiles abandons its work.
+func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
+ paths := make(chan string)
+ errc := make(chan error, 1)
+ go func() { // HL
+ // Close the paths channel after Walk returns.
+ defer close(paths) // HL
+ // No select needed for this send, since errc is buffered.
+ errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL
+ if err != nil {
+ return err
+ }
+ if info.IsDir() {
+ return nil
+ }
+ select {
+ case paths <- path: // HL
+ case <-done: // HL
+ return errors.New("walk canceled")
+ }
+ return nil
+ })
+ }()
+ return paths, errc
+}
+
+// A result is the product of reading and summing a file using MD5.
+type result struct {
+ path string
+ sum [md5.Size]byte
+ err error
+}
+
+// digester reads path names from paths and sends digests of the corresponding
+// files on c until either paths or done is closed.
+func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
+ for path := range paths { // HLpaths
+ data, err := ioutil.ReadFile(path)
+ select {
+ case c <- result{path, md5.Sum(data), err}:
+ case <-done:
+ return
+ }
+ }
+}
+
+// MD5All reads all the files in the file tree rooted at root and returns a map
+// from file path to the MD5 sum of the file's contents. If the directory walk
+// fails or any read operation fails, MD5All returns an error. In that case,
+// MD5All does not wait for inflight read operations to complete.
+func MD5All(root string) (map[string][md5.Size]byte, error) {
+ // MD5All closes the done channel when it returns; it may do so before
+ // receiving all the values from c and errc.
+ done := make(chan struct{})
+ defer close(done)
+
+ paths, errc := walkFiles(done, root)
+
+ // Start a fixed number of goroutines to read and digest files.
+ c := make(chan result) // HLc
+ var wg sync.WaitGroup
+ const numDigesters = 20
+ wg.Add(numDigesters)
+ for i := 0; i < numDigesters; i++ {
+ go func() {
+ digester(done, paths, c) // HLc
+ wg.Done()
+ }()
+ }
+ go func() {
+ wg.Wait()
+ close(c) // HLc
+ }()
+ // End of pipeline. OMIT
+
+ m := make(map[string][md5.Size]byte)
+ for r := range c {
+ if r.err != nil {
+ return nil, r.err
+ }
+ m[r.path] = r.sum
+ }
+ // Check whether the Walk failed.
+ if err := <-errc; err != nil { // HLerrc
+ return nil, err
+ }
+ return m, nil
+}
+
+func main() {
+ // Calculate the MD5 sum of all files under the specified directory,
+ // then print the results sorted by path name.
+ m, err := MD5All(os.Args[1])
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ var paths []string
+ for path := range m {
+ paths = append(paths, path)
+ }
+ sort.Strings(paths)
+ for _, path := range paths {
+ fmt.Printf("%x %s\n", m[path], path)
+ }
+}
diff --git a/content/pipelines/parallel.go b/content/pipelines/parallel.go
new file mode 100644
index 0000000..7edee1d
--- /dev/null
+++ b/content/pipelines/parallel.go
@@ -0,0 +1,109 @@
+package main
+
+import (
+ "crypto/md5"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "sort"
+ "sync"
+)
+
+// A result is the product of reading and summing a file using MD5.
+type result struct {
+ path string
+ sum [md5.Size]byte
+ err error
+}
+
+// sumFiles starts goroutines to walk the directory tree at root and digest each
+// regular file. These goroutines send the results of the digests on the result
+// channel and send the result of the walk on the error channel. If done is
+// closed, sumFiles abandons its work.
+func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
+ // For each regular file, start a goroutine that sums the file and sends
+ // the result on c. Send the result of the walk on errc.
+ c := make(chan result)
+ errc := make(chan error, 1)
+ go func() { // HL
+ var wg sync.WaitGroup
+ err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ if info.IsDir() {
+ return nil
+ }
+ wg.Add(1)
+ go func() { // HL
+ data, err := ioutil.ReadFile(path)
+ select {
+ case c <- result{path, md5.Sum(data), err}: // HL
+ case <-done: // HL
+ }
+ wg.Done()
+ }()
+ // Abort the walk if done is closed.
+ select {
+ case <-done: // HL
+ return errors.New("walk canceled")
+ default:
+ return nil
+ }
+ })
+ // Walk has returned, so all calls to wg.Add are done. Start a
+ // goroutine to close c once all the sends are done.
+ go func() { // HL
+ wg.Wait()
+ close(c) // HL
+ }()
+ // No select needed here, since errc is buffered.
+ errc <- err // HL
+ }()
+ return c, errc
+}
+
+// MD5All reads all the files in the file tree rooted at root and returns a map
+// from file path to the MD5 sum of the file's contents. If the directory walk
+// fails or any read operation fails, MD5All returns an error. In that case,
+// MD5All does not wait for inflight read operations to complete.
+func MD5All(root string) (map[string][md5.Size]byte, error) {
+ // MD5All closes the done channel when it returns; it may do so before
+ // receiving all the values from c and errc.
+ done := make(chan struct{}) // HLdone
+ defer close(done) // HLdone
+
+ c, errc := sumFiles(done, root) // HLdone
+
+ m := make(map[string][md5.Size]byte)
+ for r := range c { // HLrange
+ if r.err != nil {
+ return nil, r.err
+ }
+ m[r.path] = r.sum
+ }
+ if err := <-errc; err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+func main() {
+ // Calculate the MD5 sum of all files under the specified directory,
+ // then print the results sorted by path name.
+ m, err := MD5All(os.Args[1])
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ var paths []string
+ for path := range m {
+ paths = append(paths, path)
+ }
+ sort.Strings(paths)
+ for _, path := range paths {
+ fmt.Printf("%x %s\n", m[path], path)
+ }
+}
diff --git a/content/pipelines/serial.go b/content/pipelines/serial.go
new file mode 100644
index 0000000..7a44ca1
--- /dev/null
+++ b/content/pipelines/serial.go
@@ -0,0 +1,53 @@
+package main
+
+import (
+ "crypto/md5"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "sort"
+)
+
+// MD5All reads all the files in the file tree rooted at root and returns a map
+// from file path to the MD5 sum of the file's contents. If the directory walk
+// fails or any read operation fails, MD5All returns an error.
+func MD5All(root string) (map[string][md5.Size]byte, error) {
+ m := make(map[string][md5.Size]byte)
+ err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL
+ if err != nil {
+ return err
+ }
+ if info.IsDir() {
+ return nil
+ }
+ data, err := ioutil.ReadFile(path) // HL
+ if err != nil {
+ return err
+ }
+ m[path] = md5.Sum(data) // HL
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+func main() {
+ // Calculate the MD5 sum of all files under the specified directory,
+ // then print the results sorted by path name.
+ m, err := MD5All(os.Args[1]) // HL
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ var paths []string
+ for path := range m {
+ paths = append(paths, path)
+ }
+ sort.Strings(paths) // HL
+ for _, path := range paths {
+ fmt.Printf("%x %s\n", m[path], path)
+ }
+}
diff --git a/content/pipelines/sqbuffer.go b/content/pipelines/sqbuffer.go
new file mode 100644
index 0000000..45e7f6f
--- /dev/null
+++ b/content/pipelines/sqbuffer.go
@@ -0,0 +1,73 @@
+package main
+
+import (
+ "fmt"
+ "sync"
+)
+
+// gen sends the values in nums on the returned channel, then closes it.
+func gen(nums ...int) <-chan int {
+ out := make(chan int, len(nums))
+ for _, n := range nums {
+ out <- n
+ }
+ close(out)
+ return out
+}
+
+// sq receives values from in, squares them, and sends them on the returned
+// channel, until in is closed. Then sq closes the returned channel.
+func sq(in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for n := range in {
+ out <- n * n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// merge receives values from each input channel and sends them on the returned
+// channel. merge closes the returned channel after all the input values have
+// been sent.
+func merge(cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup
+ out := make(chan int, 1) // enough space for the unread inputs
+ // ... the rest is unchanged ...
+
+ // Start an output goroutine for each input channel in cs. output
+ // copies values from c to out until c is closed, then calls wg.Done.
+ output := func(c <-chan int) {
+ for n := range c {
+ out <- n
+ }
+ wg.Done()
+ }
+ wg.Add(len(cs))
+ for _, c := range cs {
+ go output(c)
+ }
+
+ // Start a goroutine to close out once all the output goroutines are
+ // done. This must start after the wg.Add call.
+ go func() {
+ wg.Wait()
+ close(out)
+ }()
+ return out
+}
+
+func main() {
+ in := gen(2, 3)
+
+ // Distribute the sq work across two goroutines that both read from in.
+ c1 := sq(in)
+ c2 := sq(in)
+
+ // Consume the first value from output.
+ out := merge(c1, c2)
+ fmt.Println(<-out) // 4 or 9
+ return
+ // The second value is sent into out's buffer, and all goroutines exit.
+}
diff --git a/content/pipelines/sqdone1.go b/content/pipelines/sqdone1.go
new file mode 100644
index 0000000..5d47514
--- /dev/null
+++ b/content/pipelines/sqdone1.go
@@ -0,0 +1,81 @@
+package main
+
+import (
+ "fmt"
+ "sync"
+)
+
+// gen sends the values in nums on the returned channel, then closes it.
+func gen(nums ...int) <-chan int {
+ out := make(chan int, len(nums))
+ for _, n := range nums {
+ out <- n
+ }
+ close(out)
+ return out
+}
+
+// sq receives values from in, squares them, and sends them on the returned
+// channel, until in is closed. Then sq closes the returned channel.
+func sq(in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for n := range in {
+ out <- n * n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// merge receives values from each input channel and sends them on the returned
+// channel. merge closes the returned channel after all the input values have
+// been sent.
+func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup
+ out := make(chan int)
+
+ // Start an output goroutine for each input channel in cs. output
+ // copies values from c to out until c is closed or it receives a value
+ // from done, then output calls wg.Done.
+ output := func(c <-chan int) {
+ for n := range c {
+ select {
+ case out <- n:
+ case <-done: // HL
+ }
+ }
+ wg.Done()
+ }
+ // ... the rest is unchanged ...
+
+ wg.Add(len(cs))
+ for _, c := range cs {
+ go output(c)
+ }
+
+ // Start a goroutine to close out once all the output goroutines are
+ // done. This must start after the wg.Add call.
+ go func() {
+ wg.Wait()
+ close(out)
+ }()
+ return out
+}
+
+func main() {
+ in := gen(2, 3)
+
+ // Distribute the sq work across two goroutines that both read from in.
+ c1 := sq(in)
+ c2 := sq(in)
+
+ // Consume the first value from output.
+ done := make(chan struct{}, 2) // HL
+ out := merge(done, c1, c2)
+ fmt.Println(<-out) // 4 or 9
+
+ // Tell the remaining senders we're leaving.
+ done <- struct{}{} // HL
+ done <- struct{}{} // HL
+}
diff --git a/content/pipelines/sqdone2.go b/content/pipelines/sqdone2.go
new file mode 100644
index 0000000..86b2487
--- /dev/null
+++ b/content/pipelines/sqdone2.go
@@ -0,0 +1,78 @@
+package main
+
+import (
+ "fmt"
+ "sync"
+)
+
+// gen sends the values in nums on the returned channel, then closes it.
+func gen(nums ...int) <-chan int {
+ out := make(chan int, len(nums))
+ for _, n := range nums {
+ out <- n
+ }
+ close(out)
+ return out
+}
+
+// sq receives values from in, squares them, and sends them on the returned
+// channel, until in is closed. Then sq closes the returned channel.
+func sq(in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for n := range in {
+ out <- n * n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// merge receives values from each input channel and sends them on the returned
+// channel. merge closes the returned channel after all the input values have
+// been sent.
+func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup
+ out := make(chan int)
+
+ // Start an output goroutine for each input channel in cs. output
+ // copies values from c to out until c or done is closed, then calls
+ // wg.Done.
+ output := func(c <-chan int) {
+ for n := range c {
+ select {
+ case out <- n:
+ case <-done: // HL
+ }
+ }
+ wg.Done()
+ }
+ wg.Add(len(cs))
+ for _, c := range cs {
+ go output(c)
+ }
+
+ // Start a goroutine to close out once all the output goroutines are
+ // done. This must start after the wg.Add call.
+ go func() {
+ wg.Wait()
+ close(out)
+ }()
+ return out
+}
+
+func main() {
+ in := gen(2, 3)
+
+ // Distribute the sq work across two goroutines that both read from in.
+ c1 := sq(in)
+ c2 := sq(in)
+
+ // Consume the first value from output.
+ done := make(chan struct{}) // HL
+ out := merge(done, c1, c2) // HL
+ fmt.Println(<-out) // 4 or 9
+
+ // Tell the remaining senders we're leaving.
+ close(done) // HL
+}
diff --git a/content/pipelines/sqdone3.go b/content/pipelines/sqdone3.go
new file mode 100644
index 0000000..63a2269
--- /dev/null
+++ b/content/pipelines/sqdone3.go
@@ -0,0 +1,88 @@
+package main
+
+import (
+ "fmt"
+ "sync"
+)
+
+// gen sends the values in nums on the returned channel, then closes it.
+func gen(done <-chan struct{}, nums ...int) <-chan int {
+ out := make(chan int, len(nums))
+ for _, n := range nums {
+ // We ignore done here because these sends cannot block.
+ out <- n
+ }
+ close(out)
+ return out
+}
+
+// sq receives values from in, squares them, and sends them on the returned
+// channel, until in or done is closed. Then sq closes the returned channel.
+func sq(done <-chan struct{}, in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ defer close(out) // HL
+ for n := range in {
+ select {
+ case out <- n * n:
+ case <-done:
+ return
+ }
+ }
+ }()
+ return out
+}
+
+// merge receives values from each input channel and sends them on the returned
+// channel. merge closes the returned channel after all the input values have
+// been sent or after done is closed.
+func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup
+ out := make(chan int)
+
+ // Start an output goroutine for each input channel in cs. output
+ // copies values from c to out until c or done is closed, then calls
+ // wg.Done.
+ output := func(c <-chan int) {
+ defer wg.Done()
+ for n := range c {
+ select {
+ case out <- n:
+ case <-done:
+ return
+ }
+ }
+ }
+ wg.Add(len(cs))
+ for _, c := range cs {
+ go output(c)
+ }
+
+ // Start a goroutine to close out once all the output goroutines are
+ // done. This must start after the wg.Add call.
+ go func() {
+ wg.Wait()
+ close(out)
+ }()
+ return out
+}
+
+func main() {
+ // Set up a done channel that's shared by the whole pipeline,
+ // and close that channel when this pipeline exits, as a signal
+ // for all the goroutines we started to exit.
+ done := make(chan struct{}) // HL
+ defer close(done) // HL
+
+ in := gen(done, 2, 3)
+
+ // Distribute the sq work across two goroutines that both read from in.
+ c1 := sq(done, in)
+ c2 := sq(done, in)
+
+ // Consume the first value from output.
+ out := merge(done, c1, c2)
+ fmt.Println(<-out) // 4 or 9
+
+ // done will be closed by the deferred call. // HL
+}
diff --git a/content/pipelines/sqfan.go b/content/pipelines/sqfan.go
new file mode 100644
index 0000000..b5702cd
--- /dev/null
+++ b/content/pipelines/sqfan.go
@@ -0,0 +1,73 @@
+package main
+
+import (
+ "fmt"
+ "sync"
+)
+
+// gen sends the values in nums on the returned channel, then closes it.
+func gen(nums ...int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for _, n := range nums {
+ out <- n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// sq receives values from in, squares them, and sends them on the returned
+// channel, until in is closed. Then sq closes the returned channel.
+func sq(in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for n := range in {
+ out <- n * n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// merge receives values from each input channel and sends them on the returned
+// channel. merge closes the returned channel after all the input values have
+// been sent.
+func merge(cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup // HL
+ out := make(chan int)
+
+ // Start an output goroutine for each input channel in cs. output
+ // copies values from c to out until c is closed, then calls wg.Done.
+ output := func(c <-chan int) {
+ for n := range c {
+ out <- n
+ }
+ wg.Done() // HL
+ }
+ wg.Add(len(cs)) // HL
+ for _, c := range cs {
+ go output(c)
+ }
+
+ // Start a goroutine to close out once all the output goroutines are
+ // done. This must start after the wg.Add call.
+ go func() {
+ wg.Wait() // HL
+ close(out)
+ }()
+ return out
+}
+
+func main() {
+ in := gen(2, 3)
+
+ // Distribute the sq work across two goroutines that both read from in.
+ c1 := sq(in)
+ c2 := sq(in)
+
+ // Consume the merged output from c1 and c2.
+ for n := range merge(c1, c2) {
+ fmt.Println(n) // 4 then 9, or 9 then 4
+ }
+}
diff --git a/content/pipelines/sqleak.go b/content/pipelines/sqleak.go
new file mode 100644
index 0000000..a0d22ee
--- /dev/null
+++ b/content/pipelines/sqleak.go
@@ -0,0 +1,75 @@
+package main
+
+import (
+ "fmt"
+ "sync"
+)
+
+// gen sends the values in nums on the returned channel, then closes it.
+func gen(nums ...int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for _, n := range nums {
+ out <- n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// sq receives values from in, squares them, and sends them on the returned
+// channel, until in is closed. Then sq closes the returned channel.
+func sq(in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for n := range in {
+ out <- n * n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// merge receives values from each input channel and sends them on the returned
+// channel. merge closes the returned channel after all the input values have
+// been sent.
+func merge(cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup
+ out := make(chan int)
+
+ // Start an output goroutine for each input channel in cs. output
+ // copies values from c to out until c is closed, then calls wg.Done.
+ output := func(c <-chan int) {
+ for n := range c {
+ out <- n
+ }
+ wg.Done()
+ }
+ wg.Add(len(cs))
+ for _, c := range cs {
+ go output(c)
+ }
+
+ // Start a goroutine to close out once all the output goroutines are
+ // done. This must start after the wg.Add call.
+ go func() {
+ wg.Wait()
+ close(out)
+ }()
+ return out
+}
+
+func main() {
+ in := gen(2, 3)
+
+ // Distribute the sq work across two goroutines that both read from in.
+ c1 := sq(in)
+ c2 := sq(in)
+
+ // Consume the first value from output.
+ out := merge(c1, c2)
+ fmt.Println(<-out) // 4 or 9
+ return
+ // Since we didn't receive the second value from out,
+ // one of the output goroutines is hung attempting to send it.
+}
diff --git a/content/pipelines/square.go b/content/pipelines/square.go
new file mode 100644
index 0000000..07c9523
--- /dev/null
+++ b/content/pipelines/square.go
@@ -0,0 +1,38 @@
+package main
+
+import "fmt"
+
+// gen sends the values in nums on the returned channel, then closes it.
+func gen(nums ...int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for _, n := range nums {
+ out <- n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// sq receives values from in, squares them, and sends them on the returned
+// channel, until in is closed. Then sq closes the returned channel.
+func sq(in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for n := range in {
+ out <- n * n
+ }
+ close(out)
+ }()
+ return out
+}
+
+func main() {
+ // Set up the pipeline.
+ c := gen(2, 3)
+ out := sq(c)
+
+ // Consume the output.
+ fmt.Println(<-out) // 4
+ fmt.Println(<-out) // 9
+}
diff --git a/content/pipelines/square2.go b/content/pipelines/square2.go
new file mode 100644
index 0000000..4530fea
--- /dev/null
+++ b/content/pipelines/square2.go
@@ -0,0 +1,35 @@
+package main
+
+import "fmt"
+
+// gen sends the values in nums on the returned channel, then closes it.
+func gen(nums ...int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for _, n := range nums {
+ out <- n
+ }
+ close(out)
+ }()
+ return out
+}
+
+// sq receives values from in, squares them, and sends them on the returned
+// channel, until in is closed. Then sq closes the returned channel.
+func sq(in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for n := range in {
+ out <- n * n
+ }
+ close(out)
+ }()
+ return out
+}
+
+func main() {
+ // Set up the pipeline and consume the output.
+ for n := range sq(sq(gen(2, 3))) {
+ fmt.Println(n) // 16 then 81
+ }
+}