From feddf287257e0d7d8d755daab3a591185ca5844a Mon Sep 17 00:00:00 2001 From: ZaneYork Date: Fri, 14 Jul 2017 20:42:20 +0800 Subject: [PATCH] v1.0 --- main/main.go | 52 ++++++++++++++++ stream/collect.go | 50 ++++++++++++++++ stream/collectors/ToMap.go | 46 ++++++++++++++ stream/collectors/averagingFloat64.go | 47 +++++++++++++++ stream/collectors/counting.go | 33 ++++++++++ stream/collectors/groupingBy.go | 63 +++++++++++++++++++ stream/collectors/joining.go | 63 +++++++++++++++++++ stream/collectors/partitioningBy.go | 63 +++++++++++++++++++ stream/collectors/summarizingFloat64.go | 80 +++++++++++++++++++++++++ stream/filter.go | 51 ++++++++++++++++ stream/map.go | 39 ++++++++++++ stream/reduce.go | 9 +++ stream/stream.go | 47 +++++++++++++++ 13 files changed, 643 insertions(+) create mode 100644 main/main.go create mode 100644 stream/collect.go create mode 100644 stream/collectors/ToMap.go create mode 100644 stream/collectors/averagingFloat64.go create mode 100644 stream/collectors/counting.go create mode 100644 stream/collectors/groupingBy.go create mode 100644 stream/collectors/joining.go create mode 100644 stream/collectors/partitioningBy.go create mode 100644 stream/collectors/summarizingFloat64.go create mode 100644 stream/filter.go create mode 100644 stream/map.go create mode 100644 stream/reduce.go create mode 100644 stream/stream.go diff --git a/main/main.go b/main/main.go new file mode 100644 index 0000000..b4540c0 --- /dev/null +++ b/main/main.go @@ -0,0 +1,52 @@ +package main + +import ( + . "github.com/go-func/stream" + "github.com/go-func/stream/collectors" + "fmt" +) + +func main() { + list := make([]interface{}, 100) + for i := 0; i < 100; i++ { + list[i] = i * 2 + } + tmp := NewParallelStream(list).Filter(func(item interface{}) bool { + if item.(int) < 100 { + return false + } + return true + }).Map(func(item interface{}) interface{} { + value := item.(int) + value /= 2 + return value + }).Collect(&collectors.GroupingBy{Classifier: func(item interface{}) interface{} { + return item.(int)%2 == 0 + }, DownStream: &collectors.PartitioningBy{Predicate: func(item interface{}) bool { + return item.(int)%3 == 0 + }, DownStream: &collectors.SummarizingFloat64{Mapper: func(item interface{}) float64 { + return float64(item.(int)) + }}}}) + /* + .Collect(&collectors.PartitioningBy{Predicate: func(item interface{}) bool { + return item.(int)%3 == 0 + }}) + .Collect(&collectors.GroupingBy{Classifier: func(item interface{}) interface{} { + return item.(int)%3 == 0 + }}) + .Collect(&collectors.Joining{Prefix: "[", Suffix: "]", Delimiter: ","}) + .Collect(&collectors.Counting{}) + .Collect(&collectors.AveragingFloat64{Mapper: func(item interface{}) float64 { + return float64(item.(int)) + }}) + .Collect(&collectors.ToMap{KeyMapper: func(item interface{}) interface{} { + return item.(int) * 2 + }, ValueMapper: func(item interface{}) interface{} { + return item + }}) + .Collect(&collectors.SummarizingFloat64{Mapper: func(item interface{}) float64 { + return float64(item.(int)) + }}) + */ + fmt.Println(tmp) +} diff --git a/stream/collect.go b/stream/collect.go new file mode 100644 index 0000000..9c934ea --- /dev/null +++ b/stream/collect.go @@ -0,0 +1,50 @@ +package stream + +import "runtime" + +// Collector 收集器接口,对于所有的并发流,定义打包行为 +type Collector interface { + Supplier() func() interface{} + Accumulator() func(interface{}, interface{}) interface{} + Combiner() func(interface{}, interface{}) interface{} + Finisher() func(interface{}) interface{} +} + +// Collect 收集器,对于所有的并发流,打包 +func (s *Stream) Collect(collector Collector) interface{} { + identity := collector.Supplier()() + if s.isParallel { + count := runtime.NumCPU() + ch1 := make(chan interface{}, count) + ch2 := make(chan interface{}) + for i := 0; i < count; i++ { + go func() { + result := collector.Supplier()() + for true { + if value, ok := <-ch1; !ok { + break + } else { + result = collector.Accumulator()(result, value) + } + } + ch2 <- result + }() + } + go func() { + for _, v := range s.list { + ch1 <- v + } + close(ch1) + }() + for i := 0; i < count; i++ { + tmp := <-ch2 + identity = collector.Combiner()(identity, tmp) + } + } else { + for _, v := range s.list { + identity = collector.Accumulator()(identity, v) + } + } + identity = collector.Finisher()(identity) + return identity +} diff --git a/stream/collectors/ToMap.go b/stream/collectors/ToMap.go new file mode 100644 index 0000000..bbf0890 --- /dev/null +++ b/stream/collectors/ToMap.go @@ -0,0 +1,46 @@ +package collectors + +// ToMap 按照指定的规则组装为Map +type ToMap struct { + KeyMapper func(interface{}) interface{} + ValueMapper func(interface{}) interface{} +} + +// Supplier 提供容器 +func (tm *ToMap) Supplier() func() interface{} { + return func() interface{} { + return make(map[interface{}]interface{}) + } +} + +// Accumulator 处理函数 +func (tm *ToMap) Accumulator() func(interface{}, interface{}) interface{} { + return func(identity interface{}, element interface{}) interface{} { + ret := identity.(map[interface{}]interface{}) + k := tm.KeyMapper(element) + v := tm.ValueMapper(element) + ret[k] = v + return ret + } +} + +// Combiner 组装结果 +func (tm *ToMap) Combiner() func(interface{}, interface{}) interface{} { + return func(a interface{}, b interface{}) interface{} { + mapA := a.(map[interface{}]interface{}) + mapB := b.(map[interface{}]interface{}) + for k, v := range mapB { + if _, ok := mapA[k]; !ok { + mapA[k] = v + } + } + return mapA + } +} + +// Finisher 收尾处理 +func (tm *ToMap) Finisher() func(interface{}) interface{} { + return func(identity interface{}) interface{} { + return identity + } +} diff --git a/stream/collectors/averagingFloat64.go b/stream/collectors/averagingFloat64.go new file mode 100644 index 0000000..13e4658 --- /dev/null +++ b/stream/collectors/averagingFloat64.go @@ -0,0 +1,47 @@ +package collectors + +// AveragingFloat64 统计结果集平均值 +type AveragingFloat64 struct { + Mapper func(interface{}) float64 +} + +type aveMidResult struct { + Sum float64 + Count int +} + +// Supplier 提供容器 +func (ave *AveragingFloat64) Supplier() func() interface{} { + return func() interface{} { + return aveMidResult{Sum: 0, Count: 0} + } +} + +// Accumulator 处理函数 +func (ave *AveragingFloat64) Accumulator() func(interface{}, interface{}) interface{} { + return func(identity interface{}, element interface{}) interface{} { + mid := identity.(aveMidResult) + mid.Count++ + mid.Sum += ave.Mapper(element) + return mid + } +} + +// Combiner 组装结果 +func (ave *AveragingFloat64) Combiner() func(interface{}, interface{}) interface{} { + return func(a interface{}, b interface{}) interface{} { + midA := a.(aveMidResult) + midB := b.(aveMidResult) + midA.Sum += midB.Sum + midA.Count += midB.Count + return midA + } +} + +// Finisher 收尾处理 +func (ave *AveragingFloat64) Finisher() func(interface{}) interface{} { + return func(identity interface{}) interface{} { + mid := identity.(aveMidResult) + return mid.Sum / float64(mid.Count) + } +} diff --git a/stream/collectors/counting.go b/stream/collectors/counting.go new file mode 100644 index 0000000..5abdc0b --- /dev/null +++ b/stream/collectors/counting.go @@ -0,0 +1,33 @@ +package collectors + +// Counting 统计结果集数量 +type Counting struct { +} + +// Supplier 提供容器 +func (c *Counting) Supplier() func() interface{} { + return func() interface{} { + return int(0) + } +} + +// Accumulator 处理函数 +func (c *Counting) Accumulator() func(interface{}, interface{}) interface{} { + return func(identity interface{}, element interface{}) interface{} { + return identity.(int) + 1 + } +} + +// Combiner 组装结果 +func (c *Counting) Combiner() func(interface{}, interface{}) interface{} { + return func(a interface{}, b interface{}) interface{} { + return a.(int) + b.(int) + } +} + +// Finisher 收尾处理 +func (c *Counting) Finisher() func(interface{}) interface{} { + return func(identity interface{}) interface{} { + return identity + } +} diff --git a/stream/collectors/groupingBy.go b/stream/collectors/groupingBy.go new file mode 100644 index 0000000..8dc65a2 --- /dev/null +++ b/stream/collectors/groupingBy.go @@ -0,0 +1,63 @@ +package collectors + +import . "github.com/go-func/stream" + +// GroupingBy 按照指定的规则分组 +type GroupingBy struct { + Classifier func(interface{}) interface{} + DownStream Collector +} + +// Supplier 提供容器 +func (g *GroupingBy) Supplier() func() interface{} { + return func() interface{} { + return make(map[interface{}][]interface{}) + } +} + +// Accumulator 处理函数 +func (g *GroupingBy) Accumulator() func(interface{}, interface{}) interface{} { + return func(identity interface{}, element interface{}) interface{} { + ret := identity.(map[interface{}][]interface{}) + k := g.Classifier(element) + if value, ok := ret[k]; ok { + ret[k] = append(value, element) + } else { + list := make([]interface{}, 1) + list[0] = element + ret[k] = list + } + return ret + } +} + +// Combiner 组装结果 +func (g *GroupingBy) Combiner() func(interface{}, interface{}) interface{} { + return func(a interface{}, b interface{}) interface{} { + mapA := a.(map[interface{}][]interface{}) + mapB := b.(map[interface{}][]interface{}) + for k, v := range mapB { + if value, ok := mapA[k]; ok { + mapA[k] = append(value, v...) + } else { + mapA[k] = v + } + } + return mapA + } +} + +// Finisher 收尾处理 +func (g *GroupingBy) Finisher() func(interface{}) interface{} { + return func(identity interface{}) interface{} { + if g.DownStream != nil { + result := identity.(map[interface{}][]interface{}) + downStreamResult := make(map[interface{}]interface{}) + for k, v := range result { + downStreamResult[k] = NewStream(v).Collect(g.DownStream) + } + return downStreamResult + } + return identity + } +} diff --git a/stream/collectors/joining.go b/stream/collectors/joining.go new file mode 100644 index 0000000..82d1476 --- /dev/null +++ b/stream/collectors/joining.go @@ -0,0 +1,63 @@ +package collectors + +import ( + "bytes" + "fmt" + "strconv" +) + +// Joining 按照指定的连接成字符串 +type Joining struct { + Delimiter string + Suffix string + Prefix string +} + +// Supplier 提供容器 +func (j *Joining) Supplier() func() interface{} { + return func() interface{} { + var buffer bytes.Buffer + return buffer + } +} + +// Accumulator 处理函数 +func (j *Joining) Accumulator() func(interface{}, interface{}) interface{} { + return func(identity interface{}, element interface{}) interface{} { + buffer := identity.(bytes.Buffer) + switch element.(type) { + case string: + buffer.WriteString(element.(string)) + case int: + buffer.WriteString(strconv.Itoa(element.(int))) + default: + str2 := fmt.Sprintf("%v", element) + buffer.WriteString(str2) + } + buffer.WriteString(j.Delimiter) + return buffer + } +} + +// Combiner 组装结果 +func (j *Joining) Combiner() func(interface{}, interface{}) interface{} { + return func(a interface{}, b interface{}) interface{} { + bufferA := a.(bytes.Buffer) + bufferB := b.(bytes.Buffer) + bufferA.WriteString(bufferB.String()) + return bufferA + } +} + +// Finisher 收尾处理 +func (j *Joining) Finisher() func(interface{}) interface{} { + return func(identity interface{}) interface{} { + buffer := identity.(bytes.Buffer) + str := buffer.String() + if len(str) > 0 { + str = str[:len(str)-len(j.Delimiter)] + } + ret := j.Prefix + str + j.Suffix + return ret + } +} diff --git a/stream/collectors/partitioningBy.go b/stream/collectors/partitioningBy.go new file mode 100644 index 0000000..4689225 --- /dev/null +++ b/stream/collectors/partitioningBy.go @@ -0,0 +1,63 @@ +package collectors + +import . "github.com/go-func/stream" + +// PartitioningBy 按照指定的分区 +type PartitioningBy struct { + Predicate func(interface{}) bool + DownStream Collector +} + +// Supplier 提供容器 +func (p *PartitioningBy) Supplier() func() interface{} { + return func() interface{} { + return make(map[bool][]interface{}) + } +} + +// Accumulator 处理函数 +func (p *PartitioningBy) Accumulator() func(interface{}, interface{}) interface{} { + return func(identity interface{}, element interface{}) interface{} { + ret := identity.(map[bool][]interface{}) + k := p.Predicate(element) + if value, ok := ret[k]; ok { + ret[k] = append(value, element) + } else { + list := make([]interface{}, 1) + list[0] = element + ret[k] = list + } + return ret + } +} + +// Combiner 组装结果 +func (p *PartitioningBy) Combiner() func(interface{}, interface{}) interface{} { + return func(a interface{}, b interface{}) interface{} { + mapA := a.(map[bool][]interface{}) + mapB := b.(map[bool][]interface{}) + for k, v := range mapB { + if value, ok := mapA[k]; ok { + mapA[k] = append(value, v...) + } else { + mapA[k] = v + } + } + return mapA + } +} + +// Finisher 收尾处理 +func (p *PartitioningBy) Finisher() func(interface{}) interface{} { + return func(identity interface{}) interface{} { + if p.DownStream != nil { + result := identity.(map[bool][]interface{}) + downStreamResult := make(map[bool]interface{}) + for k, v := range result { + downStreamResult[k] = NewStream(v).Collect(p.DownStream) + } + return downStreamResult + } + return identity + } +} diff --git a/stream/collectors/summarizingFloat64.go b/stream/collectors/summarizingFloat64.go new file mode 100644 index 0000000..30d6967 --- /dev/null +++ b/stream/collectors/summarizingFloat64.go @@ -0,0 +1,80 @@ +package collectors + +// SummarizingFloat64 对数据集进行统计 +type SummarizingFloat64 struct { + Mapper func(interface{}) float64 +} + +// Float64SummaryStatistics 数据集统计结果_Float64 +type Float64SummaryStatistics struct { + Min float64 + Max float64 + Average float64 + Sum float64 + Count int +} + +// Combine 拼装两个结果集 +func (summary *Float64SummaryStatistics) Combine(other *Float64SummaryStatistics) { + if summary.Count == 0 { + summary.Max = other.Max + summary.Min = other.Max + } else if other.Count != 0 { + if summary.Max < other.Max { + summary.Max = other.Max + } + if summary.Min > other.Min { + summary.Min = other.Min + } + } + summary.Sum += other.Sum + summary.Count += other.Count +} + +// Supplier 提供容器 +func (summary *SummarizingFloat64) Supplier() func() interface{} { + return func() interface{} { + return Float64SummaryStatistics{Min: 0, Max: 0, Average: 0, Sum: 0, Count: 0} + } +} + +// Accumulator 处理函数 +func (summary *SummarizingFloat64) Accumulator() func(interface{}, interface{}) interface{} { + return func(identity interface{}, element interface{}) interface{} { + mid := identity.(Float64SummaryStatistics) + value := summary.Mapper(element) + if mid.Count == 0 { + mid.Max = value + mid.Min = value + } else { + if mid.Max < value { + mid.Max = value + } + if mid.Min > value { + mid.Min = value + } + } + mid.Sum += value + mid.Count++ + return mid + } +} + +// Combiner 组装结果 +func (summary *SummarizingFloat64) Combiner() func(interface{}, interface{}) interface{} { + return func(a interface{}, b interface{}) interface{} { + midA := a.(Float64SummaryStatistics) + midB := b.(Float64SummaryStatistics) + midA.Combine(&midB) + return midA + } +} + +// Finisher 收尾处理 +func (summary *SummarizingFloat64) Finisher() func(interface{}) interface{} { + return func(identity interface{}) interface{} { + mid := identity.(Float64SummaryStatistics) + mid.Average = mid.Sum / float64(mid.Count) + return mid + } +} diff --git a/stream/filter.go b/stream/filter.go new file mode 100644 index 0000000..ce0ba7b --- /dev/null +++ b/stream/filter.go @@ -0,0 +1,51 @@ +package stream + +import "runtime" + +// Filter 对任意数组操作,将不合格的元素踢出去 +func (s *Stream) Filter(predicate func(interface{}) bool) *Stream { + ret := make([]interface{}, len(s.list)) + size := 0 + if s.isParallel { + count := runtime.NumCPU() + ch1 := make(chan interface{}, count) + ch2 := make(chan interface{}) + for i := 0; i < count; i++ { + go func() { + for true { + if value, ok := <-ch1; !ok { + break + } else { + if predicate(value) { + ch2 <- value + } else { + ch2 <- nil + } + } + } + }() + } + go func() { + for _, v := range s.list { + ch1 <- v + } + close(ch1) + }() + for i := 0; i < len(s.list); i++ { + value := <-ch2 + if value != nil { + ret[size] = value + size++ + } + } + } else { + for _, v := range s.list { + if predicate(v) { + ret[size] = v + size++ + } + } + } + s.list = ret[:size] + return s +} diff --git a/stream/map.go b/stream/map.go new file mode 100644 index 0000000..9897199 --- /dev/null +++ b/stream/map.go @@ -0,0 +1,39 @@ +package stream + +import "runtime" + +//Map 对任意数组操作,将A包装成B +func (s *Stream) Map(mappper func(interface{}) interface{}) *Stream { + ret := make([]interface{}, len(s.list)) + if s.isParallel { + count := runtime.NumCPU() + ch1 := make(chan interface{}, count) + ch2 := make(chan interface{}) + for i := 0; i < count; i++ { + go func() { + for true { + if value, ok := <-ch1; !ok { + break + } else { + ch2 <- mappper(value) + } + } + }() + } + go func() { + for _, v := range s.list { + ch1 <- v + } + close(ch1) + }() + for i := 0; i < len(s.list); i++ { + ret[i] = <-ch2 + } + } else { + for i, v := range s.list { + ret[i] = mappper(v) + } + } + s.list = ret + return s +} diff --git a/stream/reduce.go b/stream/reduce.go new file mode 100644 index 0000000..266deb4 --- /dev/null +++ b/stream/reduce.go @@ -0,0 +1,9 @@ +package stream + +//Reduce 对任意数组操作,对数组进行降维打击 +func (s *Stream) Reduce(identity interface{}, accumulator func(interface{}, interface{}) interface{}) interface{} { + for _, v := range s.list { + identity = accumulator(identity, v) + } + return identity +} diff --git a/stream/stream.go b/stream/stream.go new file mode 100644 index 0000000..50b617f --- /dev/null +++ b/stream/stream.go @@ -0,0 +1,47 @@ +package stream + +// Stream 数据流 +type Stream struct { + isParallel bool + list []interface{} +} + +// Parallel 转为并发数据流 +func (s *Stream) Parallel() *Stream { + s.isParallel = true + return s +} + +// Sequential 转为单线程数据流 +func (s *Stream) Sequential() *Stream { + s.isParallel = false + return s +} + +// NewStream 新建数据流 +func NewStream(list []interface{}) *Stream { + return &Stream{list: list, isParallel: false} +} + +// NewParallelStream 新建并发数据流 +func NewParallelStream(list []interface{}) *Stream { + return &Stream{list: list, isParallel: true} +} + +// Count 统计数量 +func (s *Stream) Count() int { + return len(s.list) +} + +// ToList 转为列表 +func (s *Stream) ToList() []interface{} { + return s.list +} + +// FindFirst 取得第一条 +func (s *Stream) FindFirst() interface{} { + if len(s.list) > 0 { + return s.list[0] + } + return nil +}