go-func/stream/collect.go

51 lines
1.2 KiB
Go
Raw Permalink Normal View History

2017-07-14 20:42:20 +08:00
package stream
import "runtime"
// Collector 收集器接口,对于所有的并发流,定义打包行为
type Collector interface {
Supplier() func() interface{}
Accumulator() func(interface{}, interface{}) interface{}
Combiner() func(interface{}, interface{}) interface{}
Finisher() func(interface{}) interface{}
}
// Collect 收集器,对于所有的并发流,打包
func (s *Stream) Collect(collector Collector) interface{} {
identity := collector.Supplier()()
if s.isParallel {
count := runtime.NumCPU()
ch1 := make(chan interface{}, count)
ch2 := make(chan interface{})
for i := 0; i < count; i++ {
go func() {
result := collector.Supplier()()
for true {
if value, ok := <-ch1; !ok {
break
} else {
result = collector.Accumulator()(result, value)
}
}
ch2 <- result
}()
}
go func() {
for _, v := range s.list {
ch1 <- v
}
close(ch1)
}()
for i := 0; i < count; i++ {
tmp := <-ch2
identity = collector.Combiner()(identity, tmp)
}
} else {
for _, v := range s.list {
identity = collector.Accumulator()(identity, v)
}
}
identity = collector.Finisher()(identity)
return identity
}