gleam流式计算(分布式/单机)
创始人
2024-06-02 22:11:00
0

介绍

Gleam 是一个高效、可伸缩的分布式map/reduce系统,DAG(有向无环图)执行模式,数据可以纯内存或磁盘的方式进行流转。 可单点,也可以分布式部署 https://github.com/chrislusf/gleam

使用Gleam之前,我们需要先了解下map/reduce的概念(看源码之前先找一些相关资料了解其工作原理)

MapReduce讲的就是分而治之的程序处理理念,把一个复杂的任务划分为若干个简单的任务分别来做。map和reduce函数是要执行的任务,由master分配任务给worker执行。map函数读取被分配的输入数据片段,输出中间key/value pair值的集合,reduce函数收集具有相同中间key值的value值,合并这些value值,形成一个较小的value值的集合。

比如要做一个统计词频的工作流:

  1. 输入(input)为文件内容

Hello PHP

Hello Java

Hello C
Hello Java
Hello C++

  1. 拆分(split)把文件进行分片,交给worker,进行map处理,将上述文档中每一行的内容转换为key-value对,

(PHP ,1)

(Java, 1)

(C, 1)

(Java, 1)

(C++, 1)

3.分发(shuffle)将key相同的扔到一起去,即:

(PHP ,1)

(Java, 1, 1)

(C, 1)

(C++, 1)

  1. 归并(reduce),上一步的结果集分发给对应的reduce函数,对相同key的value进行遍历

(PHP ,1)

(Java, 2)

(C, 1)

(C++, 1)

\5. 之后就可以做一些排序、topN等等操作。

单机模式

数据量不是很大(数GB)的情况下,单机模式就可以满足需求,以下是单机模式的流程图

1

在gleam中,从数据源 …-> map …-> reduce …-> 输出,整个链中数据均通过管道io.Piper进行流转

  1. 数据源
    默认支持txt、csv、Tsv、Parquet、Orc、Sokect监听、Channel、Bytes、Strings、Ints、Slices 除此之外,框架对数据源的读取已经做了高度封装,只要实现一个接口就可以无缝融入到Flow流程当中

    type Sourcer interface {Generate(*Flow) *Dataset
    }
    
  2. map函数
    map函数是一个类型:type Mapper func([]interface{}) error , 所以自定义的map函数必须和该类型匹配,如

    var Token = gio.RegisterMapper(token)   //必须是全局变量。 Token为maperId,框架的Map函数是通过此ID进行定位用户自定义map函数的, token为自定义函数名//读取一行数据
    func token(row []interface{}) error {...//对每行数据进行切割,并设置key , value...//gio.Emit(key, value)//Emit 函数对key ,value编码后,写入了os.Stdout中, 刚才提到,Gleam中数据是通过io.Pipe进行流转的,所以此处的os.Stdout 不会输出到终端,//因为此处的os.Stdout 被重定向了return nil
    }
    

    IO重定向,位置:github.com\chrislusf\gleam\util\exec_util.go. 下图中的reader和writer 实际上分别是一个ioPipe()的实例 PipeReader和PipeWriter

    2

  3. reduce
    reduce 也函数是一个类型:type Reducer func(x, y interface{}) (interface{}, error) , 所以自定义的reduce 函数必须和该类型匹配,如

    var Sum = gio.RegisterReducer(sum)   //必须是全局变量。 Sum 为reduceId,框架的Reduce函数是通过此ID进行定位用户自定义reduce函数的, sum为自定义函数名//对相同key的元素进行遍历
    //x代表一个key的累计值, y代表该key的当前一个处理值
    //最终实现了对key的值进行求和
    func sum(x, y interface{}) (interface{}, error) {return gio.ToInt64(x) + gio.ToInt64(y), nil
    }
    

    注:当map后dataSet中没有重复key, 则后续的reduce函数不会执行

  4. 输出
    带缓冲的:Fprintf
    直接输出:Fprintlnf
    自定义输出:OutputRow ,此接口我们可以自定义一个回调函数,灵活实现个性化需求。

    除此之外也可以添加其他的处理节点,Gleam所提供的有:Sort 排序、取TopN、SelectKv、Select选择Key,从1开始、Partition 分区 等等

提示

官方示例中 “Word Count” 存在一个bug

3

正确写法:那么A, 要么B
A. ReduceByKey(“sum”, Sum). //按第一个key降序排序
B. ReduceBy(“sum”, Sum, flow.OrderBy(1, false)). //按第一个key降序排序

示例:

自定义Json数据源,计算学生的平均值,并取前三名

package mainimport ("github.com/chrislusf/gleam/gio""flag""github.com/chrislusf/gleam/distributed""encoding/json""github.com/chrislusf/gleam/flow""github.com/chrislusf/gleam/util""fmt""os""strconv")var (isDistributed   = flag.Bool("distributed", false, "run in distributed or not")verbose         = flag.Bool("verbose", false, "print out actual mapper and reducer function names")
)var (Token1 = gio.RegisterMapper(token1)Avg = gio.RegisterMapper(avg)Combine = gio.RegisterReducer(combine)//学科数目subjectsNum int64 = 2
)type Submit struct {Name string       `json:"name"`Score int             `json:"score"`Subject string     `json:"subject"`
}
type test []Submitfunc main() {if *verbose {gio.ListRegisteredFunctions()}gio.Init()json_str := `[{"name":"stu1","score": 21,"subject": "数学"},{"name":"stu2","score": 100,"subject": "数学"},{"name":"stu3","score": 50,"subject": "数学"},{"name":"stu4","score": 70,"subject": "数学"},{"name":"stu5","score": 80,"subject": "数学"},{"name":"stu5","score": 77,"subject": "语文"}]        
`subs := &test{}json.Unmarshal([]byte(json_str), subs)db := make([][]interface{}, len(*subs))for i, v := range *subs{t, _ := json.Marshal(v)db[i] = append(db[i], t)}f := flow.New("平均分为前三且不是stu2的学生").Slices(db).Map("tokenize", Token1).ReduceByKey("combine", Combine).Map("avg", Avg).Sort("按学生成绩倒序排序(中英文都可以)", flow.OrderBy(2, false)).Top("top3,用此函数可以省略排序如:Sort,SortByKey", 3, flow.OrderBy(2, false)).OutputRow(func(row *util.Row) error{decodedObjects := make([]interface{}, 0)decodedObjects = append(decodedObjects, row.K...)decodedObjects = append(decodedObjects, row.V...)fmt.Printf("output: %v ,%v\n", decodedObjects...)return nil})if *isDistributed {f.Run(distributed.Option())}else {f.Run()}
}//处理数据获取可用字段
func token1(row []interface{}) error{sub := &Submit{}b, _ := row[0].([]interface{})json.Unmarshal(b[0].([]byte), sub)//排除学生stu2if sub.Name == "stu2"{return nil}gio.Emit(sub.Name, []int64{gio.ToInt64(sub.Score)})return nil
}//计算平均值
func avg(row []interface{}) (error){var total int64if r, ok := row[1].([]interface{}); ok{for _, v := range r{if score, ok := v.(int64); ok {total += score}}}avg := Decimal(float64(total)/ float64(subjectsNum))//myPrint("avg: %v\n", avg)gio.Emit(row[0], avg)return nil
}//合并学科分数
func combine(x, y interface{}) (interface{}, error){switch t := x.(type) {case []interface{}:t = append(t, convertToSlice(y)...)//myPrint("t: %v\n", reflect.TypeOf(t))return t, nil}return x, nil
}func convertToSlice(v interface{}) ([]interface{}){switch t := v.(type) {case []interface{}:return tdefault:return nil}
}func Decimal(value float64) float64 {value, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", value), 64)return value
}//由于在map中 os.Stdout 被重定向了,所以用os.Stderr来打印数据
func myPrint(format string, v...interface{}){fmt.Fprintf(os.Stderr, format, v...)
}

总结

像Slices、Bytes、Channel 等,数据获取方式,可以很好的和golang的kafkaWorker结合进行流计算

相关内容

热门资讯

编程安卓系统和鸿蒙主题,跨平台... 你有没有想过,手机的世界里,除了苹果的iOS和安卓的操作系统,还有个神秘的鸿蒙系统?今天,咱们就来聊...
哪个安卓机系统好用,探索安卓系... 你有没有想过,手机里的安卓系统就像是个大厨,不同的系统就像不同的烹饪手法,有的让你吃得津津有味,有的...
安卓如何控制苹果系统,从安卓到... 你知道吗?在这个科技飞速发展的时代,安卓和苹果两大操作系统之间的较量从未停歇。虽然它们各自有着忠实的...
安卓原生系统文件夹,安卓原生系... 你有没有发现,每次打开安卓手机,里面那些文件夹就像是一个个神秘的宝箱,里面藏着各种各样的宝贝?今天,...
基于安卓系统的游戏开发,从入门... 你有没有想过,为什么安卓手机上的游戏总是那么吸引人?是不是因为它们就像是你身边的好朋友,随时随地都能...
安卓系统怎样装驱动精灵,安卓系... 你那安卓设备是不是突然间有点儿不给力了?别急,今天就来手把手教你如何给安卓系统装上驱动精灵,让你的设...
如何本地安装安卓系统包,详细步... 你有没有想过,把安卓系统装在你的电脑上,是不是就像给电脑穿上了时尚的新衣?想象你可以在电脑上直接玩手...
安卓12卡刷系统教程,体验全新... 你有没有发现,你的安卓手机最近有点儿不给力了?运行速度慢得像蜗牛,是不是也想给它来个“换血大法”,让...
安卓系统无法打开swf文件,安... 最近是不是发现你的安卓手机有点儿不给力?打开SWF文件时,是不是总是出现“无法打开”的尴尬局面?别急...
鸿蒙系统依赖于安卓系统吗,独立... 你有没有想过,我们手机里的那个鸿蒙系统,它是不是真的完全独立于安卓系统呢?这个问题,估计不少手机控都...
适合安卓系统的图片软件,精选图... 手机里堆满了各种美美的照片,是不是觉得找起来有点头疼呢?别急,今天就来给你安利几款超级适合安卓系统的...
阴阳师安卓系统典藏,探寻阴阳师... 亲爱的阴阳师们,你是否在安卓系统上玩得如痴如醉,对那些精美的典藏式神们垂涎欲滴?今天,就让我带你深入...
安卓系统有碎片化缺点,系统优化... 你知道吗?在手机江湖里,安卓系统可是个响当当的大侠。它那开放、自由的个性,让无数手机厂商和开发者都为...
安卓4系统手机微信,功能解析与... 你有没有发现,现在市面上还有很多安卓4系统的手机在使用呢?尤其是那些喜欢微信的朋友们,这款手机简直就...
鸿蒙系统是安卓的盗版,从安卓“... 你知道吗?最近在科技圈里,关于鸿蒙系统的讨论可是热闹非凡呢!有人说是安卓的盗版,有人则认为这是华为的...
安卓系统怎么剪辑音乐,轻松打造... 你是不是也和我一样,手机里存了超多好听的歌,但是有时候想给它们来个变身,变成一段专属的旋律呢?别急,...
怎么把安卓手机系统变为pc系统... 你有没有想过,把你的安卓手机变成一台PC呢?听起来是不是有点酷炫?想象你可以在手机上玩电脑游戏,或者...
手机怎么装安卓11系统,手机安... 你有没有想过,让你的手机也来个“青春焕发”,升级一下系统呢?没错,就是安卓11系统!这个新系统不仅带...
安卓系统如何拼网络,构建高效连... 你有没有想过,你的安卓手机是怎么和网络“谈恋爱”的呢?没错,就是拼网络!今天,就让我带你一探究竟,看...
安卓系统怎么看小说,轻松畅享电... 你有没有发现,手机里装了那么多应用,最离不开的竟然是那个小小的小说阅读器?没错,就是安卓系统上的小说...