gleam流式计算(分布式/单机)
创始人
2024-06-02 22:11:00
0

介绍

Gleam 是一个高效、可伸缩的分布式map/reduce系统,DAG(有向无环图)执行模式,数据可以纯内存或磁盘的方式进行流转。 可单点,也可以分布式部署 https://github.com/chrislusf/gleam

使用Gleam之前,我们需要先了解下map/reduce的概念(看源码之前先找一些相关资料了解其工作原理)

MapReduce讲的就是分而治之的程序处理理念,把一个复杂的任务划分为若干个简单的任务分别来做。map和reduce函数是要执行的任务,由master分配任务给worker执行。map函数读取被分配的输入数据片段,输出中间key/value pair值的集合,reduce函数收集具有相同中间key值的value值,合并这些value值,形成一个较小的value值的集合。

比如要做一个统计词频的工作流:

  1. 输入(input)为文件内容

Hello PHP

Hello Java

Hello C
Hello Java
Hello C++

  1. 拆分(split)把文件进行分片,交给worker,进行map处理,将上述文档中每一行的内容转换为key-value对,

(PHP ,1)

(Java, 1)

(C, 1)

(Java, 1)

(C++, 1)

3.分发(shuffle)将key相同的扔到一起去,即:

(PHP ,1)

(Java, 1, 1)

(C, 1)

(C++, 1)

  1. 归并(reduce),上一步的结果集分发给对应的reduce函数,对相同key的value进行遍历

(PHP ,1)

(Java, 2)

(C, 1)

(C++, 1)

\5. 之后就可以做一些排序、topN等等操作。

单机模式

数据量不是很大(数GB)的情况下,单机模式就可以满足需求,以下是单机模式的流程图

1

在gleam中,从数据源 …-> map …-> reduce …-> 输出,整个链中数据均通过管道io.Piper进行流转

  1. 数据源
    默认支持txt、csv、Tsv、Parquet、Orc、Sokect监听、Channel、Bytes、Strings、Ints、Slices 除此之外,框架对数据源的读取已经做了高度封装,只要实现一个接口就可以无缝融入到Flow流程当中

    type Sourcer interface {Generate(*Flow) *Dataset
    }
    
  2. map函数
    map函数是一个类型:type Mapper func([]interface{}) error , 所以自定义的map函数必须和该类型匹配,如

    var Token = gio.RegisterMapper(token)   //必须是全局变量。 Token为maperId,框架的Map函数是通过此ID进行定位用户自定义map函数的, token为自定义函数名//读取一行数据
    func token(row []interface{}) error {...//对每行数据进行切割,并设置key , value...//gio.Emit(key, value)//Emit 函数对key ,value编码后,写入了os.Stdout中, 刚才提到,Gleam中数据是通过io.Pipe进行流转的,所以此处的os.Stdout 不会输出到终端,//因为此处的os.Stdout 被重定向了return nil
    }
    

    IO重定向,位置:github.com\chrislusf\gleam\util\exec_util.go. 下图中的reader和writer 实际上分别是一个ioPipe()的实例 PipeReader和PipeWriter

    2

  3. reduce
    reduce 也函数是一个类型:type Reducer func(x, y interface{}) (interface{}, error) , 所以自定义的reduce 函数必须和该类型匹配,如

    var Sum = gio.RegisterReducer(sum)   //必须是全局变量。 Sum 为reduceId,框架的Reduce函数是通过此ID进行定位用户自定义reduce函数的, sum为自定义函数名//对相同key的元素进行遍历
    //x代表一个key的累计值, y代表该key的当前一个处理值
    //最终实现了对key的值进行求和
    func sum(x, y interface{}) (interface{}, error) {return gio.ToInt64(x) + gio.ToInt64(y), nil
    }
    

    注:当map后dataSet中没有重复key, 则后续的reduce函数不会执行

  4. 输出
    带缓冲的:Fprintf
    直接输出:Fprintlnf
    自定义输出:OutputRow ,此接口我们可以自定义一个回调函数,灵活实现个性化需求。

    除此之外也可以添加其他的处理节点,Gleam所提供的有:Sort 排序、取TopN、SelectKv、Select选择Key,从1开始、Partition 分区 等等

提示

官方示例中 “Word Count” 存在一个bug

3

正确写法:那么A, 要么B
A. ReduceByKey(“sum”, Sum). //按第一个key降序排序
B. ReduceBy(“sum”, Sum, flow.OrderBy(1, false)). //按第一个key降序排序

示例:

自定义Json数据源,计算学生的平均值,并取前三名

package mainimport ("github.com/chrislusf/gleam/gio""flag""github.com/chrislusf/gleam/distributed""encoding/json""github.com/chrislusf/gleam/flow""github.com/chrislusf/gleam/util""fmt""os""strconv")var (isDistributed   = flag.Bool("distributed", false, "run in distributed or not")verbose         = flag.Bool("verbose", false, "print out actual mapper and reducer function names")
)var (Token1 = gio.RegisterMapper(token1)Avg = gio.RegisterMapper(avg)Combine = gio.RegisterReducer(combine)//学科数目subjectsNum int64 = 2
)type Submit struct {Name string       `json:"name"`Score int             `json:"score"`Subject string     `json:"subject"`
}
type test []Submitfunc main() {if *verbose {gio.ListRegisteredFunctions()}gio.Init()json_str := `[{"name":"stu1","score": 21,"subject": "数学"},{"name":"stu2","score": 100,"subject": "数学"},{"name":"stu3","score": 50,"subject": "数学"},{"name":"stu4","score": 70,"subject": "数学"},{"name":"stu5","score": 80,"subject": "数学"},{"name":"stu5","score": 77,"subject": "语文"}]        
`subs := &test{}json.Unmarshal([]byte(json_str), subs)db := make([][]interface{}, len(*subs))for i, v := range *subs{t, _ := json.Marshal(v)db[i] = append(db[i], t)}f := flow.New("平均分为前三且不是stu2的学生").Slices(db).Map("tokenize", Token1).ReduceByKey("combine", Combine).Map("avg", Avg).Sort("按学生成绩倒序排序(中英文都可以)", flow.OrderBy(2, false)).Top("top3,用此函数可以省略排序如:Sort,SortByKey", 3, flow.OrderBy(2, false)).OutputRow(func(row *util.Row) error{decodedObjects := make([]interface{}, 0)decodedObjects = append(decodedObjects, row.K...)decodedObjects = append(decodedObjects, row.V...)fmt.Printf("output: %v ,%v\n", decodedObjects...)return nil})if *isDistributed {f.Run(distributed.Option())}else {f.Run()}
}//处理数据获取可用字段
func token1(row []interface{}) error{sub := &Submit{}b, _ := row[0].([]interface{})json.Unmarshal(b[0].([]byte), sub)//排除学生stu2if sub.Name == "stu2"{return nil}gio.Emit(sub.Name, []int64{gio.ToInt64(sub.Score)})return nil
}//计算平均值
func avg(row []interface{}) (error){var total int64if r, ok := row[1].([]interface{}); ok{for _, v := range r{if score, ok := v.(int64); ok {total += score}}}avg := Decimal(float64(total)/ float64(subjectsNum))//myPrint("avg: %v\n", avg)gio.Emit(row[0], avg)return nil
}//合并学科分数
func combine(x, y interface{}) (interface{}, error){switch t := x.(type) {case []interface{}:t = append(t, convertToSlice(y)...)//myPrint("t: %v\n", reflect.TypeOf(t))return t, nil}return x, nil
}func convertToSlice(v interface{}) ([]interface{}){switch t := v.(type) {case []interface{}:return tdefault:return nil}
}func Decimal(value float64) float64 {value, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", value), 64)return value
}//由于在map中 os.Stdout 被重定向了,所以用os.Stderr来打印数据
func myPrint(format string, v...interface{}){fmt.Fprintf(os.Stderr, format, v...)
}

总结

像Slices、Bytes、Channel 等,数据获取方式,可以很好的和golang的kafkaWorker结合进行流计算

相关内容

热门资讯

安卓系统用的华为应用,探索智能... 你知道吗?在安卓系统里,华为的应用可是个宝库呢!它们不仅功能强大,而且使用起来超级方便。今天,就让我...
安卓变ios系统魅蓝 你知道吗?最近有个朋友突然告诉我,他要把自己的安卓手机换成iOS系统,而且还是魅蓝品牌的!这可真是让...
幻书启世录安卓系统,安卓世界中... 亲爱的读者们,你是否曾在某个夜晚,被一本神奇的书所吸引,仿佛它拥有着穿越时空的力量?今天,我要带你走...
电脑安装安卓系统进不去,安卓系... 电脑安装安卓系统后竟然进不去,这可真是让人头疼的问题啊!你是不是也遇到了这种情况,心里直呼“怎么办怎...
用键盘切换控制安卓系统,畅享安... 你有没有想过,用键盘来控制你的安卓手机?是的,你没听错,就是那个我们每天敲敲打打的小玩意儿——键盘。...
小米安卓镜像系统在哪,小米安卓... 你有没有想过,你的小米手机里有一个隐藏的宝藏——安卓镜像系统?没错,就是那个可以让你的手机瞬间变身成...
安卓手机下载排班系统,高效排班... 你有没有想过,每天忙碌的工作中,有没有什么好帮手能帮你轻松管理时间呢?今天,就让我来给你介绍一个超级...
桌面组件如何弄安卓系统,桌面组... 亲爱的桌面爱好者们,你是否曾梦想过将安卓系统搬到你的电脑桌面上?想象那些流畅的动画、丰富的应用,还有...
安卓13系统介绍视频,新功能与... 亲爱的读者们,你是否对安卓13系统充满好奇?想要一探究竟,却又苦于没有足够的时间去研究?别担心,今天...
车机安卓7.1系统,功能升级与... 你有没有发现,现在的车机系统越来越智能了?尤其是那些搭载了安卓7.1系统的车机,简直就像是个贴心的智...
安卓系统下如何读pdf,And... 你有没有遇到过这种情况:手机里存了一大堆PDF文件,可是怎么也找不到一个能顺畅阅读的工具?别急,今天...
安卓系统全国通用的吗,畅享智能... 你有没有想过,为什么你的手机里装的是安卓系统呢?安卓系统,这个名字听起来是不是有点神秘?今天,就让我...
假苹果手机8安卓系统,颠覆传统... 你有没有想过,如果苹果手机突然变成了安卓系统,会是怎样的景象呢?想象那熟悉的苹果外观,却运行着安卓的...
安卓12.0系统vivo有吗,... 你有没有听说最近安卓系统又升级啦?没错,就是那个让手机焕然一新的安卓12.0系统!那么,咱们国内的手...
核心芯片和安卓系统,探索核心芯... 你知道吗?在科技的世界里,有一对“黄金搭档”正悄悄改变着我们的生活。他们就是——核心芯片和安卓系统。...
如何调安卓系统屏幕颜色,安卓系... 亲爱的手机控们,你是否曾觉得安卓系统的屏幕颜色不够个性,或者是因为长时间盯着屏幕而感到眼睛疲劳?别担...
旧台式电脑安装安卓系统,轻松安... 你那台旧台式电脑是不是已经服役多年,性能逐渐力不从心,却又不忍心让它退役呢?别急,今天就来教你怎么给...
美国要求关闭安卓系统,科技霸权... 美国要求关闭安卓系统:一场技术革新还是政治博弈?在数字化时代,智能手机已经成为我们生活中不可或缺的一...
安卓系统日记本 你有没有发现,手机里的安卓系统日记本,简直就是记录生活点滴的宝藏库呢?想象每天忙碌的生活中,有没有那...
安卓手机广告最少的系统,探索安... 你有没有发现,用安卓手机的时候,广告总是无处不在,让人烦得要命?不过别急,今天我要给你揭秘一个秘密—...