diff options
author | Sameer Ajmani <sameer@golang.org> | 2014-03-12 23:43:55 -0400 |
---|---|---|
committer | Sameer Ajmani <sameer@golang.org> | 2014-03-12 23:43:55 -0400 |
commit | 9aa2e1143c55cfa6a0b8651067b34c61da8d5646 (patch) | |
tree | f0eb469d9b435c085c77a943225c9eef72d84eb3 /content/pipelines | |
parent | 401e4f4a7cd251187c7e3b18b1dc2c583e6d2e45 (diff) |
go.blog: pipelines and cancellation.
R=adg, r, rsc, ken, bcmills
CC=adonovan, campoy, david.crawshaw, gmlewis, golang-codereviews
https://golang.org/cl/71070047
Diffstat (limited to 'content/pipelines')
-rw-r--r-- | content/pipelines/bounded.go | 121 | ||||
-rw-r--r-- | content/pipelines/parallel.go | 109 | ||||
-rw-r--r-- | content/pipelines/serial.go | 53 | ||||
-rw-r--r-- | content/pipelines/sqbuffer.go | 73 | ||||
-rw-r--r-- | content/pipelines/sqdone1.go | 81 | ||||
-rw-r--r-- | content/pipelines/sqdone2.go | 78 | ||||
-rw-r--r-- | content/pipelines/sqdone3.go | 88 | ||||
-rw-r--r-- | content/pipelines/sqfan.go | 73 | ||||
-rw-r--r-- | content/pipelines/sqleak.go | 75 | ||||
-rw-r--r-- | content/pipelines/square.go | 38 | ||||
-rw-r--r-- | content/pipelines/square2.go | 35 |
11 files changed, 824 insertions, 0 deletions
diff --git a/content/pipelines/bounded.go b/content/pipelines/bounded.go new file mode 100644 index 0000000..90eed08 --- /dev/null +++ b/content/pipelines/bounded.go @@ -0,0 +1,121 @@ +package main + +import ( + "crypto/md5" + "errors" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + "sync" +) + +// walkFiles starts a goroutine to walk the directory tree at root and send the +// path of each regular file on the string channel. It sends the result of the +// walk on the error channel. If done is closed, walkFiles abandons its work. +func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) { + paths := make(chan string) + errc := make(chan error, 1) + go func() { // HL + // Close the paths channel after Walk returns. + defer close(paths) // HL + // No select needed for this send, since errc is buffered. + errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL + if err != nil { + return err + } + if info.IsDir() { + return nil + } + select { + case paths <- path: // HL + case <-done: // HL + return errors.New("walk canceled") + } + return nil + }) + }() + return paths, errc +} + +// A result is the product of reading and summing a file using MD5. +type result struct { + path string + sum [md5.Size]byte + err error +} + +// digester reads path names from paths and sends digests of the corresponding +// files on c until either paths or done is closed. +func digester(done <-chan struct{}, paths <-chan string, c chan<- result) { + for path := range paths { // HLpaths + data, err := ioutil.ReadFile(path) + select { + case c <- result{path, md5.Sum(data), err}: + case <-done: + return + } + } +} + +// MD5All reads all the files in the file tree rooted at root and returns a map +// from file path to the MD5 sum of the file's contents. If the directory walk +// fails or any read operation fails, MD5All returns an error. In that case, +// MD5All does not wait for inflight read operations to complete. +func MD5All(root string) (map[string][md5.Size]byte, error) { + // MD5All closes the done channel when it returns; it may do so before + // receiving all the values from c and errc. + done := make(chan struct{}) + defer close(done) + + paths, errc := walkFiles(done, root) + + // Start a fixed number of goroutines to read and digest files. + c := make(chan result) // HLc + var wg sync.WaitGroup + const numDigesters = 20 + wg.Add(numDigesters) + for i := 0; i < numDigesters; i++ { + go func() { + digester(done, paths, c) // HLc + wg.Done() + }() + } + go func() { + wg.Wait() + close(c) // HLc + }() + // End of pipeline. OMIT + + m := make(map[string][md5.Size]byte) + for r := range c { + if r.err != nil { + return nil, r.err + } + m[r.path] = r.sum + } + // Check whether the Walk failed. + if err := <-errc; err != nil { // HLerrc + return nil, err + } + return m, nil +} + +func main() { + // Calculate the MD5 sum of all files under the specified directory, + // then print the results sorted by path name. + m, err := MD5All(os.Args[1]) + if err != nil { + fmt.Println(err) + return + } + var paths []string + for path := range m { + paths = append(paths, path) + } + sort.Strings(paths) + for _, path := range paths { + fmt.Printf("%x %s\n", m[path], path) + } +} diff --git a/content/pipelines/parallel.go b/content/pipelines/parallel.go new file mode 100644 index 0000000..7edee1d --- /dev/null +++ b/content/pipelines/parallel.go @@ -0,0 +1,109 @@ +package main + +import ( + "crypto/md5" + "errors" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + "sync" +) + +// A result is the product of reading and summing a file using MD5. +type result struct { + path string + sum [md5.Size]byte + err error +} + +// sumFiles starts goroutines to walk the directory tree at root and digest each +// regular file. These goroutines send the results of the digests on the result +// channel and send the result of the walk on the error channel. If done is +// closed, sumFiles abandons its work. +func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) { + // For each regular file, start a goroutine that sums the file and sends + // the result on c. Send the result of the walk on errc. + c := make(chan result) + errc := make(chan error, 1) + go func() { // HL + var wg sync.WaitGroup + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + wg.Add(1) + go func() { // HL + data, err := ioutil.ReadFile(path) + select { + case c <- result{path, md5.Sum(data), err}: // HL + case <-done: // HL + } + wg.Done() + }() + // Abort the walk if done is closed. + select { + case <-done: // HL + return errors.New("walk canceled") + default: + return nil + } + }) + // Walk has returned, so all calls to wg.Add are done. Start a + // goroutine to close c once all the sends are done. + go func() { // HL + wg.Wait() + close(c) // HL + }() + // No select needed here, since errc is buffered. + errc <- err // HL + }() + return c, errc +} + +// MD5All reads all the files in the file tree rooted at root and returns a map +// from file path to the MD5 sum of the file's contents. If the directory walk +// fails or any read operation fails, MD5All returns an error. In that case, +// MD5All does not wait for inflight read operations to complete. +func MD5All(root string) (map[string][md5.Size]byte, error) { + // MD5All closes the done channel when it returns; it may do so before + // receiving all the values from c and errc. + done := make(chan struct{}) // HLdone + defer close(done) // HLdone + + c, errc := sumFiles(done, root) // HLdone + + m := make(map[string][md5.Size]byte) + for r := range c { // HLrange + if r.err != nil { + return nil, r.err + } + m[r.path] = r.sum + } + if err := <-errc; err != nil { + return nil, err + } + return m, nil +} + +func main() { + // Calculate the MD5 sum of all files under the specified directory, + // then print the results sorted by path name. + m, err := MD5All(os.Args[1]) + if err != nil { + fmt.Println(err) + return + } + var paths []string + for path := range m { + paths = append(paths, path) + } + sort.Strings(paths) + for _, path := range paths { + fmt.Printf("%x %s\n", m[path], path) + } +} diff --git a/content/pipelines/serial.go b/content/pipelines/serial.go new file mode 100644 index 0000000..7a44ca1 --- /dev/null +++ b/content/pipelines/serial.go @@ -0,0 +1,53 @@ +package main + +import ( + "crypto/md5" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" +) + +// MD5All reads all the files in the file tree rooted at root and returns a map +// from file path to the MD5 sum of the file's contents. If the directory walk +// fails or any read operation fails, MD5All returns an error. +func MD5All(root string) (map[string][md5.Size]byte, error) { + m := make(map[string][md5.Size]byte) + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL + if err != nil { + return err + } + if info.IsDir() { + return nil + } + data, err := ioutil.ReadFile(path) // HL + if err != nil { + return err + } + m[path] = md5.Sum(data) // HL + return nil + }) + if err != nil { + return nil, err + } + return m, nil +} + +func main() { + // Calculate the MD5 sum of all files under the specified directory, + // then print the results sorted by path name. + m, err := MD5All(os.Args[1]) // HL + if err != nil { + fmt.Println(err) + return + } + var paths []string + for path := range m { + paths = append(paths, path) + } + sort.Strings(paths) // HL + for _, path := range paths { + fmt.Printf("%x %s\n", m[path], path) + } +} diff --git a/content/pipelines/sqbuffer.go b/content/pipelines/sqbuffer.go new file mode 100644 index 0000000..45e7f6f --- /dev/null +++ b/content/pipelines/sqbuffer.go @@ -0,0 +1,73 @@ +package main + +import ( + "fmt" + "sync" +) + +// gen sends the values in nums on the returned channel, then closes it. +func gen(nums ...int) <-chan int { + out := make(chan int, len(nums)) + for _, n := range nums { + out <- n + } + close(out) + return out +} + +// sq receives values from in, squares them, and sends them on the returned +// channel, until in is closed. Then sq closes the returned channel. +func sq(in <-chan int) <-chan int { + out := make(chan int) + go func() { + for n := range in { + out <- n * n + } + close(out) + }() + return out +} + +// merge receives values from each input channel and sends them on the returned +// channel. merge closes the returned channel after all the input values have +// been sent. +func merge(cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + out := make(chan int, 1) // enough space for the unread inputs + // ... the rest is unchanged ... + + // Start an output goroutine for each input channel in cs. output + // copies values from c to out until c is closed, then calls wg.Done. + output := func(c <-chan int) { + for n := range c { + out <- n + } + wg.Done() + } + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } + + // Start a goroutine to close out once all the output goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() + close(out) + }() + return out +} + +func main() { + in := gen(2, 3) + + // Distribute the sq work across two goroutines that both read from in. + c1 := sq(in) + c2 := sq(in) + + // Consume the first value from output. + out := merge(c1, c2) + fmt.Println(<-out) // 4 or 9 + return + // The second value is sent into out's buffer, and all goroutines exit. +} diff --git a/content/pipelines/sqdone1.go b/content/pipelines/sqdone1.go new file mode 100644 index 0000000..5d47514 --- /dev/null +++ b/content/pipelines/sqdone1.go @@ -0,0 +1,81 @@ +package main + +import ( + "fmt" + "sync" +) + +// gen sends the values in nums on the returned channel, then closes it. +func gen(nums ...int) <-chan int { + out := make(chan int, len(nums)) + for _, n := range nums { + out <- n + } + close(out) + return out +} + +// sq receives values from in, squares them, and sends them on the returned +// channel, until in is closed. Then sq closes the returned channel. +func sq(in <-chan int) <-chan int { + out := make(chan int) + go func() { + for n := range in { + out <- n * n + } + close(out) + }() + return out +} + +// merge receives values from each input channel and sends them on the returned +// channel. merge closes the returned channel after all the input values have +// been sent. +func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + out := make(chan int) + + // Start an output goroutine for each input channel in cs. output + // copies values from c to out until c is closed or it receives a value + // from done, then output calls wg.Done. + output := func(c <-chan int) { + for n := range c { + select { + case out <- n: + case <-done: // HL + } + } + wg.Done() + } + // ... the rest is unchanged ... + + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } + + // Start a goroutine to close out once all the output goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() + close(out) + }() + return out +} + +func main() { + in := gen(2, 3) + + // Distribute the sq work across two goroutines that both read from in. + c1 := sq(in) + c2 := sq(in) + + // Consume the first value from output. + done := make(chan struct{}, 2) // HL + out := merge(done, c1, c2) + fmt.Println(<-out) // 4 or 9 + + // Tell the remaining senders we're leaving. + done <- struct{}{} // HL + done <- struct{}{} // HL +} diff --git a/content/pipelines/sqdone2.go b/content/pipelines/sqdone2.go new file mode 100644 index 0000000..86b2487 --- /dev/null +++ b/content/pipelines/sqdone2.go @@ -0,0 +1,78 @@ +package main + +import ( + "fmt" + "sync" +) + +// gen sends the values in nums on the returned channel, then closes it. +func gen(nums ...int) <-chan int { + out := make(chan int, len(nums)) + for _, n := range nums { + out <- n + } + close(out) + return out +} + +// sq receives values from in, squares them, and sends them on the returned +// channel, until in is closed. Then sq closes the returned channel. +func sq(in <-chan int) <-chan int { + out := make(chan int) + go func() { + for n := range in { + out <- n * n + } + close(out) + }() + return out +} + +// merge receives values from each input channel and sends them on the returned +// channel. merge closes the returned channel after all the input values have +// been sent. +func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + out := make(chan int) + + // Start an output goroutine for each input channel in cs. output + // copies values from c to out until c or done is closed, then calls + // wg.Done. + output := func(c <-chan int) { + for n := range c { + select { + case out <- n: + case <-done: // HL + } + } + wg.Done() + } + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } + + // Start a goroutine to close out once all the output goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() + close(out) + }() + return out +} + +func main() { + in := gen(2, 3) + + // Distribute the sq work across two goroutines that both read from in. + c1 := sq(in) + c2 := sq(in) + + // Consume the first value from output. + done := make(chan struct{}) // HL + out := merge(done, c1, c2) // HL + fmt.Println(<-out) // 4 or 9 + + // Tell the remaining senders we're leaving. + close(done) // HL +} diff --git a/content/pipelines/sqdone3.go b/content/pipelines/sqdone3.go new file mode 100644 index 0000000..63a2269 --- /dev/null +++ b/content/pipelines/sqdone3.go @@ -0,0 +1,88 @@ +package main + +import ( + "fmt" + "sync" +) + +// gen sends the values in nums on the returned channel, then closes it. +func gen(done <-chan struct{}, nums ...int) <-chan int { + out := make(chan int, len(nums)) + for _, n := range nums { + // We ignore done here because these sends cannot block. + out <- n + } + close(out) + return out +} + +// sq receives values from in, squares them, and sends them on the returned +// channel, until in or done is closed. Then sq closes the returned channel. +func sq(done <-chan struct{}, in <-chan int) <-chan int { + out := make(chan int) + go func() { + defer close(out) // HL + for n := range in { + select { + case out <- n * n: + case <-done: + return + } + } + }() + return out +} + +// merge receives values from each input channel and sends them on the returned +// channel. merge closes the returned channel after all the input values have +// been sent or after done is closed. +func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + out := make(chan int) + + // Start an output goroutine for each input channel in cs. output + // copies values from c to out until c or done is closed, then calls + // wg.Done. + output := func(c <-chan int) { + defer wg.Done() + for n := range c { + select { + case out <- n: + case <-done: + return + } + } + } + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } + + // Start a goroutine to close out once all the output goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() + close(out) + }() + return out +} + +func main() { + // Set up a done channel that's shared by the whole pipeline, + // and close that channel when this pipeline exits, as a signal + // for all the goroutines we started to exit. + done := make(chan struct{}) // HL + defer close(done) // HL + + in := gen(done, 2, 3) + + // Distribute the sq work across two goroutines that both read from in. + c1 := sq(done, in) + c2 := sq(done, in) + + // Consume the first value from output. + out := merge(done, c1, c2) + fmt.Println(<-out) // 4 or 9 + + // done will be closed by the deferred call. // HL +} diff --git a/content/pipelines/sqfan.go b/content/pipelines/sqfan.go new file mode 100644 index 0000000..b5702cd --- /dev/null +++ b/content/pipelines/sqfan.go @@ -0,0 +1,73 @@ +package main + +import ( + "fmt" + "sync" +) + +// gen sends the values in nums on the returned channel, then closes it. +func gen(nums ...int) <-chan int { + out := make(chan int) + go func() { + for _, n := range nums { + out <- n + } + close(out) + }() + return out +} + +// sq receives values from in, squares them, and sends them on the returned +// channel, until in is closed. Then sq closes the returned channel. +func sq(in <-chan int) <-chan int { + out := make(chan int) + go func() { + for n := range in { + out <- n * n + } + close(out) + }() + return out +} + +// merge receives values from each input channel and sends them on the returned +// channel. merge closes the returned channel after all the input values have +// been sent. +func merge(cs ...<-chan int) <-chan int { + var wg sync.WaitGroup // HL + out := make(chan int) + + // Start an output goroutine for each input channel in cs. output + // copies values from c to out until c is closed, then calls wg.Done. + output := func(c <-chan int) { + for n := range c { + out <- n + } + wg.Done() // HL + } + wg.Add(len(cs)) // HL + for _, c := range cs { + go output(c) + } + + // Start a goroutine to close out once all the output goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() // HL + close(out) + }() + return out +} + +func main() { + in := gen(2, 3) + + // Distribute the sq work across two goroutines that both read from in. + c1 := sq(in) + c2 := sq(in) + + // Consume the merged output from c1 and c2. + for n := range merge(c1, c2) { + fmt.Println(n) // 4 then 9, or 9 then 4 + } +} diff --git a/content/pipelines/sqleak.go b/content/pipelines/sqleak.go new file mode 100644 index 0000000..a0d22ee --- /dev/null +++ b/content/pipelines/sqleak.go @@ -0,0 +1,75 @@ +package main + +import ( + "fmt" + "sync" +) + +// gen sends the values in nums on the returned channel, then closes it. +func gen(nums ...int) <-chan int { + out := make(chan int) + go func() { + for _, n := range nums { + out <- n + } + close(out) + }() + return out +} + +// sq receives values from in, squares them, and sends them on the returned +// channel, until in is closed. Then sq closes the returned channel. +func sq(in <-chan int) <-chan int { + out := make(chan int) + go func() { + for n := range in { + out <- n * n + } + close(out) + }() + return out +} + +// merge receives values from each input channel and sends them on the returned +// channel. merge closes the returned channel after all the input values have +// been sent. +func merge(cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + out := make(chan int) + + // Start an output goroutine for each input channel in cs. output + // copies values from c to out until c is closed, then calls wg.Done. + output := func(c <-chan int) { + for n := range c { + out <- n + } + wg.Done() + } + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } + + // Start a goroutine to close out once all the output goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() + close(out) + }() + return out +} + +func main() { + in := gen(2, 3) + + // Distribute the sq work across two goroutines that both read from in. + c1 := sq(in) + c2 := sq(in) + + // Consume the first value from output. + out := merge(c1, c2) + fmt.Println(<-out) // 4 or 9 + return + // Since we didn't receive the second value from out, + // one of the output goroutines is hung attempting to send it. +} diff --git a/content/pipelines/square.go b/content/pipelines/square.go new file mode 100644 index 0000000..07c9523 --- /dev/null +++ b/content/pipelines/square.go @@ -0,0 +1,38 @@ +package main + +import "fmt" + +// gen sends the values in nums on the returned channel, then closes it. +func gen(nums ...int) <-chan int { + out := make(chan int) + go func() { + for _, n := range nums { + out <- n + } + close(out) + }() + return out +} + +// sq receives values from in, squares them, and sends them on the returned +// channel, until in is closed. Then sq closes the returned channel. +func sq(in <-chan int) <-chan int { + out := make(chan int) + go func() { + for n := range in { + out <- n * n + } + close(out) + }() + return out +} + +func main() { + // Set up the pipeline. + c := gen(2, 3) + out := sq(c) + + // Consume the output. + fmt.Println(<-out) // 4 + fmt.Println(<-out) // 9 +} diff --git a/content/pipelines/square2.go b/content/pipelines/square2.go new file mode 100644 index 0000000..4530fea --- /dev/null +++ b/content/pipelines/square2.go @@ -0,0 +1,35 @@ +package main + +import "fmt" + +// gen sends the values in nums on the returned channel, then closes it. +func gen(nums ...int) <-chan int { + out := make(chan int) + go func() { + for _, n := range nums { + out <- n + } + close(out) + }() + return out +} + +// sq receives values from in, squares them, and sends them on the returned +// channel, until in is closed. Then sq closes the returned channel. +func sq(in <-chan int) <-chan int { + out := make(chan int) + go func() { + for n := range in { + out <- n * n + } + close(out) + }() + return out +} + +func main() { + // Set up the pipeline and consume the output. + for n := range sq(sq(gen(2, 3))) { + fmt.Println(n) // 16 then 81 + } +} |