This commit is contained in:
ZaneYork 2017-07-14 20:42:20 +08:00 committed by GitHub
parent 37f72029d9
commit feddf28725
13 changed files with 643 additions and 0 deletions

52
main/main.go Normal file
View File

@ -0,0 +1,52 @@
package main
import (
. "github.com/go-func/stream"
"github.com/go-func/stream/collectors"
"fmt"
)
func main() {
list := make([]interface{}, 100)
for i := 0; i < 100; i++ {
list[i] = i * 2
}
tmp := NewParallelStream(list).Filter(func(item interface{}) bool {
if item.(int) < 100 {
return false
}
return true
}).Map(func(item interface{}) interface{} {
value := item.(int)
value /= 2
return value
}).Collect(&collectors.GroupingBy{Classifier: func(item interface{}) interface{} {
return item.(int)%2 == 0
}, DownStream: &collectors.PartitioningBy{Predicate: func(item interface{}) bool {
return item.(int)%3 == 0
}, DownStream: &collectors.SummarizingFloat64{Mapper: func(item interface{}) float64 {
return float64(item.(int))
}}}})
/*
.Collect(&collectors.PartitioningBy{Predicate: func(item interface{}) bool {
return item.(int)%3 == 0
}})
.Collect(&collectors.GroupingBy{Classifier: func(item interface{}) interface{} {
return item.(int)%3 == 0
}})
.Collect(&collectors.Joining{Prefix: "[", Suffix: "]", Delimiter: ","})
.Collect(&collectors.Counting{})
.Collect(&collectors.AveragingFloat64{Mapper: func(item interface{}) float64 {
return float64(item.(int))
}})
.Collect(&collectors.ToMap{KeyMapper: func(item interface{}) interface{} {
return item.(int) * 2
}, ValueMapper: func(item interface{}) interface{} {
return item
}})
.Collect(&collectors.SummarizingFloat64{Mapper: func(item interface{}) float64 {
return float64(item.(int))
}})
*/
fmt.Println(tmp)
}

50
stream/collect.go Normal file
View File

@ -0,0 +1,50 @@
package stream
import "runtime"
// Collector 收集器接口,对于所有的并发流,定义打包行为
type Collector interface {
Supplier() func() interface{}
Accumulator() func(interface{}, interface{}) interface{}
Combiner() func(interface{}, interface{}) interface{}
Finisher() func(interface{}) interface{}
}
// Collect 收集器,对于所有的并发流,打包
func (s *Stream) Collect(collector Collector) interface{} {
identity := collector.Supplier()()
if s.isParallel {
count := runtime.NumCPU()
ch1 := make(chan interface{}, count)
ch2 := make(chan interface{})
for i := 0; i < count; i++ {
go func() {
result := collector.Supplier()()
for true {
if value, ok := <-ch1; !ok {
break
} else {
result = collector.Accumulator()(result, value)
}
}
ch2 <- result
}()
}
go func() {
for _, v := range s.list {
ch1 <- v
}
close(ch1)
}()
for i := 0; i < count; i++ {
tmp := <-ch2
identity = collector.Combiner()(identity, tmp)
}
} else {
for _, v := range s.list {
identity = collector.Accumulator()(identity, v)
}
}
identity = collector.Finisher()(identity)
return identity
}

View File

@ -0,0 +1,46 @@
package collectors
// ToMap 按照指定的规则组装为Map
type ToMap struct {
KeyMapper func(interface{}) interface{}
ValueMapper func(interface{}) interface{}
}
// Supplier 提供容器
func (tm *ToMap) Supplier() func() interface{} {
return func() interface{} {
return make(map[interface{}]interface{})
}
}
// Accumulator 处理函数
func (tm *ToMap) Accumulator() func(interface{}, interface{}) interface{} {
return func(identity interface{}, element interface{}) interface{} {
ret := identity.(map[interface{}]interface{})
k := tm.KeyMapper(element)
v := tm.ValueMapper(element)
ret[k] = v
return ret
}
}
// Combiner 组装结果
func (tm *ToMap) Combiner() func(interface{}, interface{}) interface{} {
return func(a interface{}, b interface{}) interface{} {
mapA := a.(map[interface{}]interface{})
mapB := b.(map[interface{}]interface{})
for k, v := range mapB {
if _, ok := mapA[k]; !ok {
mapA[k] = v
}
}
return mapA
}
}
// Finisher 收尾处理
func (tm *ToMap) Finisher() func(interface{}) interface{} {
return func(identity interface{}) interface{} {
return identity
}
}

View File

@ -0,0 +1,47 @@
package collectors
// AveragingFloat64 统计结果集平均值
type AveragingFloat64 struct {
Mapper func(interface{}) float64
}
type aveMidResult struct {
Sum float64
Count int
}
// Supplier 提供容器
func (ave *AveragingFloat64) Supplier() func() interface{} {
return func() interface{} {
return aveMidResult{Sum: 0, Count: 0}
}
}
// Accumulator 处理函数
func (ave *AveragingFloat64) Accumulator() func(interface{}, interface{}) interface{} {
return func(identity interface{}, element interface{}) interface{} {
mid := identity.(aveMidResult)
mid.Count++
mid.Sum += ave.Mapper(element)
return mid
}
}
// Combiner 组装结果
func (ave *AveragingFloat64) Combiner() func(interface{}, interface{}) interface{} {
return func(a interface{}, b interface{}) interface{} {
midA := a.(aveMidResult)
midB := b.(aveMidResult)
midA.Sum += midB.Sum
midA.Count += midB.Count
return midA
}
}
// Finisher 收尾处理
func (ave *AveragingFloat64) Finisher() func(interface{}) interface{} {
return func(identity interface{}) interface{} {
mid := identity.(aveMidResult)
return mid.Sum / float64(mid.Count)
}
}

View File

@ -0,0 +1,33 @@
package collectors
// Counting 统计结果集数量
type Counting struct {
}
// Supplier 提供容器
func (c *Counting) Supplier() func() interface{} {
return func() interface{} {
return int(0)
}
}
// Accumulator 处理函数
func (c *Counting) Accumulator() func(interface{}, interface{}) interface{} {
return func(identity interface{}, element interface{}) interface{} {
return identity.(int) + 1
}
}
// Combiner 组装结果
func (c *Counting) Combiner() func(interface{}, interface{}) interface{} {
return func(a interface{}, b interface{}) interface{} {
return a.(int) + b.(int)
}
}
// Finisher 收尾处理
func (c *Counting) Finisher() func(interface{}) interface{} {
return func(identity interface{}) interface{} {
return identity
}
}

View File

@ -0,0 +1,63 @@
package collectors
import . "github.com/go-func/stream"
// GroupingBy 按照指定的规则分组
type GroupingBy struct {
Classifier func(interface{}) interface{}
DownStream Collector
}
// Supplier 提供容器
func (g *GroupingBy) Supplier() func() interface{} {
return func() interface{} {
return make(map[interface{}][]interface{})
}
}
// Accumulator 处理函数
func (g *GroupingBy) Accumulator() func(interface{}, interface{}) interface{} {
return func(identity interface{}, element interface{}) interface{} {
ret := identity.(map[interface{}][]interface{})
k := g.Classifier(element)
if value, ok := ret[k]; ok {
ret[k] = append(value, element)
} else {
list := make([]interface{}, 1)
list[0] = element
ret[k] = list
}
return ret
}
}
// Combiner 组装结果
func (g *GroupingBy) Combiner() func(interface{}, interface{}) interface{} {
return func(a interface{}, b interface{}) interface{} {
mapA := a.(map[interface{}][]interface{})
mapB := b.(map[interface{}][]interface{})
for k, v := range mapB {
if value, ok := mapA[k]; ok {
mapA[k] = append(value, v...)
} else {
mapA[k] = v
}
}
return mapA
}
}
// Finisher 收尾处理
func (g *GroupingBy) Finisher() func(interface{}) interface{} {
return func(identity interface{}) interface{} {
if g.DownStream != nil {
result := identity.(map[interface{}][]interface{})
downStreamResult := make(map[interface{}]interface{})
for k, v := range result {
downStreamResult[k] = NewStream(v).Collect(g.DownStream)
}
return downStreamResult
}
return identity
}
}

View File

@ -0,0 +1,63 @@
package collectors
import (
"bytes"
"fmt"
"strconv"
)
// Joining 按照指定的连接成字符串
type Joining struct {
Delimiter string
Suffix string
Prefix string
}
// Supplier 提供容器
func (j *Joining) Supplier() func() interface{} {
return func() interface{} {
var buffer bytes.Buffer
return buffer
}
}
// Accumulator 处理函数
func (j *Joining) Accumulator() func(interface{}, interface{}) interface{} {
return func(identity interface{}, element interface{}) interface{} {
buffer := identity.(bytes.Buffer)
switch element.(type) {
case string:
buffer.WriteString(element.(string))
case int:
buffer.WriteString(strconv.Itoa(element.(int)))
default:
str2 := fmt.Sprintf("%v", element)
buffer.WriteString(str2)
}
buffer.WriteString(j.Delimiter)
return buffer
}
}
// Combiner 组装结果
func (j *Joining) Combiner() func(interface{}, interface{}) interface{} {
return func(a interface{}, b interface{}) interface{} {
bufferA := a.(bytes.Buffer)
bufferB := b.(bytes.Buffer)
bufferA.WriteString(bufferB.String())
return bufferA
}
}
// Finisher 收尾处理
func (j *Joining) Finisher() func(interface{}) interface{} {
return func(identity interface{}) interface{} {
buffer := identity.(bytes.Buffer)
str := buffer.String()
if len(str) > 0 {
str = str[:len(str)-len(j.Delimiter)]
}
ret := j.Prefix + str + j.Suffix
return ret
}
}

View File

@ -0,0 +1,63 @@
package collectors
import . "github.com/go-func/stream"
// PartitioningBy 按照指定的分区
type PartitioningBy struct {
Predicate func(interface{}) bool
DownStream Collector
}
// Supplier 提供容器
func (p *PartitioningBy) Supplier() func() interface{} {
return func() interface{} {
return make(map[bool][]interface{})
}
}
// Accumulator 处理函数
func (p *PartitioningBy) Accumulator() func(interface{}, interface{}) interface{} {
return func(identity interface{}, element interface{}) interface{} {
ret := identity.(map[bool][]interface{})
k := p.Predicate(element)
if value, ok := ret[k]; ok {
ret[k] = append(value, element)
} else {
list := make([]interface{}, 1)
list[0] = element
ret[k] = list
}
return ret
}
}
// Combiner 组装结果
func (p *PartitioningBy) Combiner() func(interface{}, interface{}) interface{} {
return func(a interface{}, b interface{}) interface{} {
mapA := a.(map[bool][]interface{})
mapB := b.(map[bool][]interface{})
for k, v := range mapB {
if value, ok := mapA[k]; ok {
mapA[k] = append(value, v...)
} else {
mapA[k] = v
}
}
return mapA
}
}
// Finisher 收尾处理
func (p *PartitioningBy) Finisher() func(interface{}) interface{} {
return func(identity interface{}) interface{} {
if p.DownStream != nil {
result := identity.(map[bool][]interface{})
downStreamResult := make(map[bool]interface{})
for k, v := range result {
downStreamResult[k] = NewStream(v).Collect(p.DownStream)
}
return downStreamResult
}
return identity
}
}

View File

@ -0,0 +1,80 @@
package collectors
// SummarizingFloat64 对数据集进行统计
type SummarizingFloat64 struct {
Mapper func(interface{}) float64
}
// Float64SummaryStatistics 数据集统计结果_Float64
type Float64SummaryStatistics struct {
Min float64
Max float64
Average float64
Sum float64
Count int
}
// Combine 拼装两个结果集
func (summary *Float64SummaryStatistics) Combine(other *Float64SummaryStatistics) {
if summary.Count == 0 {
summary.Max = other.Max
summary.Min = other.Max
} else if other.Count != 0 {
if summary.Max < other.Max {
summary.Max = other.Max
}
if summary.Min > other.Min {
summary.Min = other.Min
}
}
summary.Sum += other.Sum
summary.Count += other.Count
}
// Supplier 提供容器
func (summary *SummarizingFloat64) Supplier() func() interface{} {
return func() interface{} {
return Float64SummaryStatistics{Min: 0, Max: 0, Average: 0, Sum: 0, Count: 0}
}
}
// Accumulator 处理函数
func (summary *SummarizingFloat64) Accumulator() func(interface{}, interface{}) interface{} {
return func(identity interface{}, element interface{}) interface{} {
mid := identity.(Float64SummaryStatistics)
value := summary.Mapper(element)
if mid.Count == 0 {
mid.Max = value
mid.Min = value
} else {
if mid.Max < value {
mid.Max = value
}
if mid.Min > value {
mid.Min = value
}
}
mid.Sum += value
mid.Count++
return mid
}
}
// Combiner 组装结果
func (summary *SummarizingFloat64) Combiner() func(interface{}, interface{}) interface{} {
return func(a interface{}, b interface{}) interface{} {
midA := a.(Float64SummaryStatistics)
midB := b.(Float64SummaryStatistics)
midA.Combine(&midB)
return midA
}
}
// Finisher 收尾处理
func (summary *SummarizingFloat64) Finisher() func(interface{}) interface{} {
return func(identity interface{}) interface{} {
mid := identity.(Float64SummaryStatistics)
mid.Average = mid.Sum / float64(mid.Count)
return mid
}
}

51
stream/filter.go Normal file
View File

@ -0,0 +1,51 @@
package stream
import "runtime"
// Filter 对任意数组操作,将不合格的元素踢出去
func (s *Stream) Filter(predicate func(interface{}) bool) *Stream {
ret := make([]interface{}, len(s.list))
size := 0
if s.isParallel {
count := runtime.NumCPU()
ch1 := make(chan interface{}, count)
ch2 := make(chan interface{})
for i := 0; i < count; i++ {
go func() {
for true {
if value, ok := <-ch1; !ok {
break
} else {
if predicate(value) {
ch2 <- value
} else {
ch2 <- nil
}
}
}
}()
}
go func() {
for _, v := range s.list {
ch1 <- v
}
close(ch1)
}()
for i := 0; i < len(s.list); i++ {
value := <-ch2
if value != nil {
ret[size] = value
size++
}
}
} else {
for _, v := range s.list {
if predicate(v) {
ret[size] = v
size++
}
}
}
s.list = ret[:size]
return s
}

39
stream/map.go Normal file
View File

@ -0,0 +1,39 @@
package stream
import "runtime"
//Map 对任意数组操作将A包装成B
func (s *Stream) Map(mappper func(interface{}) interface{}) *Stream {
ret := make([]interface{}, len(s.list))
if s.isParallel {
count := runtime.NumCPU()
ch1 := make(chan interface{}, count)
ch2 := make(chan interface{})
for i := 0; i < count; i++ {
go func() {
for true {
if value, ok := <-ch1; !ok {
break
} else {
ch2 <- mappper(value)
}
}
}()
}
go func() {
for _, v := range s.list {
ch1 <- v
}
close(ch1)
}()
for i := 0; i < len(s.list); i++ {
ret[i] = <-ch2
}
} else {
for i, v := range s.list {
ret[i] = mappper(v)
}
}
s.list = ret
return s
}

9
stream/reduce.go Normal file
View File

@ -0,0 +1,9 @@
package stream
//Reduce 对任意数组操作,对数组进行降维打击
func (s *Stream) Reduce(identity interface{}, accumulator func(interface{}, interface{}) interface{}) interface{} {
for _, v := range s.list {
identity = accumulator(identity, v)
}
return identity
}

47
stream/stream.go Normal file
View File

@ -0,0 +1,47 @@
package stream
// Stream 数据流
type Stream struct {
isParallel bool
list []interface{}
}
// Parallel 转为并发数据流
func (s *Stream) Parallel() *Stream {
s.isParallel = true
return s
}
// Sequential 转为单线程数据流
func (s *Stream) Sequential() *Stream {
s.isParallel = false
return s
}
// NewStream 新建数据流
func NewStream(list []interface{}) *Stream {
return &Stream{list: list, isParallel: false}
}
// NewParallelStream 新建并发数据流
func NewParallelStream(list []interface{}) *Stream {
return &Stream{list: list, isParallel: true}
}
// Count 统计数量
func (s *Stream) Count() int {
return len(s.list)
}
// ToList 转为列表
func (s *Stream) ToList() []interface{} {
return s.list
}
// FindFirst 取得第一条
func (s *Stream) FindFirst() interface{} {
if len(s.list) > 0 {
return s.list[0]
}
return nil
}