From 00fee0cdddd58c6b261c9de49fe9986ae909bf94 Mon Sep 17 00:00:00 2001 From: "Zane.Y" Date: Sat, 15 Jul 2017 11:53:28 +0800 Subject: [PATCH] ForEach function --- main/main.go | 6 +++++- stream/forEach.go | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) create mode 100644 stream/forEach.go diff --git a/main/main.go b/main/main.go index 16211d4..84a24f9 100644 --- a/main/main.go +++ b/main/main.go @@ -1,9 +1,10 @@ package main import ( + "fmt" + . "github.com/go-func/stream" "github.com/go-func/stream/collectors" - "fmt" ) func main() { @@ -47,6 +48,9 @@ func main() { .Collect(&collectors.SummarizingFloat64{Mapper: func(item interface{}) float64 { return float64(item.(int)) }}) + .ForEach(func(item interface{}) { + fmt.Println(item) + }) */ fmt.Println(tmp) } diff --git a/stream/forEach.go b/stream/forEach.go new file mode 100644 index 0000000..b22e553 --- /dev/null +++ b/stream/forEach.go @@ -0,0 +1,37 @@ +package stream + +import "runtime" + +//ForEach 对任意数组操作,针对每个元素执行一遍指定的行为 +func (s *Stream) ForEach(action func(interface{})) { + if s.isParallel { + count := runtime.NumCPU() + ch1 := make(chan interface{}, count) + ch2 := make(chan interface{}) + for i := 0; i < count; i++ { + go func() { + for true { + if value, ok := <-ch1; !ok { + break + } else { + action(value) + } + } + ch2 <- 0 + }() + } + go func() { + for _, v := range s.list { + ch1 <- v + } + close(ch1) + }() + for i := 0; i < count; i++ { + <-ch2 + } + } else { + for _, v := range s.list { + action(v) + } + } +}