gleam流式计算(分布式/单机)
创始人
2024-06-02 22:11:00
0

介绍

Gleam 是一个高效、可伸缩的分布式map/reduce系统,DAG(有向无环图)执行模式,数据可以纯内存或磁盘的方式进行流转。 可单点,也可以分布式部署 https://github.com/chrislusf/gleam

使用Gleam之前,我们需要先了解下map/reduce的概念(看源码之前先找一些相关资料了解其工作原理)

MapReduce讲的就是分而治之的程序处理理念,把一个复杂的任务划分为若干个简单的任务分别来做。map和reduce函数是要执行的任务,由master分配任务给worker执行。map函数读取被分配的输入数据片段,输出中间key/value pair值的集合,reduce函数收集具有相同中间key值的value值,合并这些value值,形成一个较小的value值的集合。

比如要做一个统计词频的工作流:

  1. 输入(input)为文件内容

Hello PHP

Hello Java

Hello C
Hello Java
Hello C++

  1. 拆分(split)把文件进行分片,交给worker,进行map处理,将上述文档中每一行的内容转换为key-value对,

(PHP ,1)

(Java, 1)

(C, 1)

(Java, 1)

(C++, 1)

3.分发(shuffle)将key相同的扔到一起去,即:

(PHP ,1)

(Java, 1, 1)

(C, 1)

(C++, 1)

  1. 归并(reduce),上一步的结果集分发给对应的reduce函数,对相同key的value进行遍历

(PHP ,1)

(Java, 2)

(C, 1)

(C++, 1)

\5. 之后就可以做一些排序、topN等等操作。

单机模式

数据量不是很大(数GB)的情况下,单机模式就可以满足需求,以下是单机模式的流程图

1

在gleam中,从数据源 …-> map …-> reduce …-> 输出,整个链中数据均通过管道io.Piper进行流转

  1. 数据源
    默认支持txt、csv、Tsv、Parquet、Orc、Sokect监听、Channel、Bytes、Strings、Ints、Slices 除此之外,框架对数据源的读取已经做了高度封装,只要实现一个接口就可以无缝融入到Flow流程当中

    type Sourcer interface {Generate(*Flow) *Dataset
    }
    
  2. map函数
    map函数是一个类型:type Mapper func([]interface{}) error , 所以自定义的map函数必须和该类型匹配,如

    var Token = gio.RegisterMapper(token)   //必须是全局变量。 Token为maperId,框架的Map函数是通过此ID进行定位用户自定义map函数的, token为自定义函数名//读取一行数据
    func token(row []interface{}) error {...//对每行数据进行切割,并设置key , value...//gio.Emit(key, value)//Emit 函数对key ,value编码后,写入了os.Stdout中, 刚才提到,Gleam中数据是通过io.Pipe进行流转的,所以此处的os.Stdout 不会输出到终端,//因为此处的os.Stdout 被重定向了return nil
    }
    

    IO重定向,位置:github.com\chrislusf\gleam\util\exec_util.go. 下图中的reader和writer 实际上分别是一个ioPipe()的实例 PipeReader和PipeWriter

    2

  3. reduce
    reduce 也函数是一个类型:type Reducer func(x, y interface{}) (interface{}, error) , 所以自定义的reduce 函数必须和该类型匹配,如

    var Sum = gio.RegisterReducer(sum)   //必须是全局变量。 Sum 为reduceId,框架的Reduce函数是通过此ID进行定位用户自定义reduce函数的, sum为自定义函数名//对相同key的元素进行遍历
    //x代表一个key的累计值, y代表该key的当前一个处理值
    //最终实现了对key的值进行求和
    func sum(x, y interface{}) (interface{}, error) {return gio.ToInt64(x) + gio.ToInt64(y), nil
    }
    

    注:当map后dataSet中没有重复key, 则后续的reduce函数不会执行

  4. 输出
    带缓冲的:Fprintf
    直接输出:Fprintlnf
    自定义输出:OutputRow ,此接口我们可以自定义一个回调函数,灵活实现个性化需求。

    除此之外也可以添加其他的处理节点,Gleam所提供的有:Sort 排序、取TopN、SelectKv、Select选择Key,从1开始、Partition 分区 等等

提示

官方示例中 “Word Count” 存在一个bug

3

正确写法:那么A, 要么B
A. ReduceByKey(“sum”, Sum). //按第一个key降序排序
B. ReduceBy(“sum”, Sum, flow.OrderBy(1, false)). //按第一个key降序排序

示例:

自定义Json数据源,计算学生的平均值,并取前三名

package mainimport ("github.com/chrislusf/gleam/gio""flag""github.com/chrislusf/gleam/distributed""encoding/json""github.com/chrislusf/gleam/flow""github.com/chrislusf/gleam/util""fmt""os""strconv")var (isDistributed   = flag.Bool("distributed", false, "run in distributed or not")verbose         = flag.Bool("verbose", false, "print out actual mapper and reducer function names")
)var (Token1 = gio.RegisterMapper(token1)Avg = gio.RegisterMapper(avg)Combine = gio.RegisterReducer(combine)//学科数目subjectsNum int64 = 2
)type Submit struct {Name string       `json:"name"`Score int             `json:"score"`Subject string     `json:"subject"`
}
type test []Submitfunc main() {if *verbose {gio.ListRegisteredFunctions()}gio.Init()json_str := `[{"name":"stu1","score": 21,"subject": "数学"},{"name":"stu2","score": 100,"subject": "数学"},{"name":"stu3","score": 50,"subject": "数学"},{"name":"stu4","score": 70,"subject": "数学"},{"name":"stu5","score": 80,"subject": "数学"},{"name":"stu5","score": 77,"subject": "语文"}]        
`subs := &test{}json.Unmarshal([]byte(json_str), subs)db := make([][]interface{}, len(*subs))for i, v := range *subs{t, _ := json.Marshal(v)db[i] = append(db[i], t)}f := flow.New("平均分为前三且不是stu2的学生").Slices(db).Map("tokenize", Token1).ReduceByKey("combine", Combine).Map("avg", Avg).Sort("按学生成绩倒序排序(中英文都可以)", flow.OrderBy(2, false)).Top("top3,用此函数可以省略排序如:Sort,SortByKey", 3, flow.OrderBy(2, false)).OutputRow(func(row *util.Row) error{decodedObjects := make([]interface{}, 0)decodedObjects = append(decodedObjects, row.K...)decodedObjects = append(decodedObjects, row.V...)fmt.Printf("output: %v ,%v\n", decodedObjects...)return nil})if *isDistributed {f.Run(distributed.Option())}else {f.Run()}
}//处理数据获取可用字段
func token1(row []interface{}) error{sub := &Submit{}b, _ := row[0].([]interface{})json.Unmarshal(b[0].([]byte), sub)//排除学生stu2if sub.Name == "stu2"{return nil}gio.Emit(sub.Name, []int64{gio.ToInt64(sub.Score)})return nil
}//计算平均值
func avg(row []interface{}) (error){var total int64if r, ok := row[1].([]interface{}); ok{for _, v := range r{if score, ok := v.(int64); ok {total += score}}}avg := Decimal(float64(total)/ float64(subjectsNum))//myPrint("avg: %v\n", avg)gio.Emit(row[0], avg)return nil
}//合并学科分数
func combine(x, y interface{}) (interface{}, error){switch t := x.(type) {case []interface{}:t = append(t, convertToSlice(y)...)//myPrint("t: %v\n", reflect.TypeOf(t))return t, nil}return x, nil
}func convertToSlice(v interface{}) ([]interface{}){switch t := v.(type) {case []interface{}:return tdefault:return nil}
}func Decimal(value float64) float64 {value, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", value), 64)return value
}//由于在map中 os.Stdout 被重定向了,所以用os.Stderr来打印数据
func myPrint(format string, v...interface{}){fmt.Fprintf(os.Stderr, format, v...)
}

总结

像Slices、Bytes、Channel 等,数据获取方式,可以很好的和golang的kafkaWorker结合进行流计算

相关内容

热门资讯

系统如何与安卓互通,技术融合与... 你有没有想过,你的手机系统竟然能和安卓系统这么默契地互通有无?这就像是一场跨越科技界的友谊赛,让我们...
安卓系统 扫码枪,安卓系统下扫... 你有没有想过,在繁忙的超市收银台,那些快速流畅的扫码操作,背后其实隐藏着一个小小的英雄——安卓系统扫...
平板插卡推荐安卓系统,安卓系统... 你有没有想过,你的平板电脑是不是也能像智能手机一样,随时随地扩充存储空间呢?没错,这就是今天我要跟你...
安卓系统固件安装失败,原因排查... 最近是不是你也遇到了安卓系统固件安装失败的问题?别急,让我来给你详细说说这个让人头疼的小麻烦,让你一...
ios系统和安卓区别,系统差异... 你有没有发现,现在手机市场上,iOS系统和安卓系统就像是一对双胞胎,长得差不多,但性格却截然不同。今...
安卓系统2.3优酷,优酷的崛起... 你有没有发现,安卓系统2.3时代的那股怀旧风?那时候,优酷可是视频界的巨头,多少人都是看着优酷长大的...
安卓导航系统密封,安卓导航系统... 你有没有发现,现在手机导航系统越来越智能了?尤其是安卓系统的导航,简直就像一个贴心的导航小助手,带你...
a版安卓11系统,a版深度解析... 你知道吗?最近手机界可是炸开了锅,各大品牌纷纷发布了搭载a版安卓11系统的手机。这可不是什么小打小闹...
安卓系统的模拟吉他,随时随地弹... 你有没有想过,在手机上也能弹奏吉他呢?没错,就是那种模拟吉他的安卓系统应用,让你随时随地都能享受音乐...
王者适配的安卓系统,深度解析适... 你有没有发现,最近玩《王者荣耀》的小伙伴们都在议论纷纷,说新出的安卓系统简直是为王者量身定做的!没错...
安卓系统自动定位关闭,隐私保护... 你有没有发现,手机里的安卓系统有时候会自动定位,这可真是让人又爱又恨啊!有时候,我们并不想让别人知道...
安卓系统电量耗尽测试,全面解析... 手机电量耗尽,这可是每个手机用户都头疼的问题。你有没有想过,你的安卓手机在电量耗尽前,到底经历了哪些...
如何升级车载安卓系统,车载安卓... 亲爱的车主朋友们,你是不是也和我一样,对车载安卓系统升级这件事充满了好奇和期待呢?想象当你驾驶着爱车...
安卓办公哪个系统好,深度解析哪... 你有没有想过,在安卓办公的世界里,哪个系统才是你的最佳拍档呢?在这个信息爆炸的时代,选择一个既强大又...
安卓系统差劲怎么解决,重拾流畅... 你有没有发现,安卓系统有时候真的让人头疼得要命?手机卡顿、应用崩溃、电池续航短,这些问题是不是让你抓...
喜欢安卓系统的原因,探索用户偏... 你有没有发现,身边的朋友、同事,甚至家人,越来越多的人开始使用安卓手机了呢?这可不是简单的潮流,而是...
安卓系统金立手机,品质生活新选... 你有没有发现,最近安卓系统下的金立手机突然火了起来?没错,就是那个曾经陪伴我们走过无数时光的金立手机...
无安卓系统的电视,新型无系统电... 亲爱的读者们,你是否厌倦了那些充斥着安卓系统的电视?想要尝试一些新鲜玩意儿?那就跟我一起探索一下无安...
麒麟系统能刷安卓系统吗,轻松刷... 你有没有想过,你的麒麟手机能不能装上安卓系统呢?这可是个让人好奇不已的问题。现在,就让我来带你一探究...
手机公司安卓系统吗,手机公司引... 你有没有想过,为什么你的手机里装的是安卓系统而不是苹果的iOS呢?这背后可是有着不少故事和门道的哦!...