gleam流式计算(分布式/单机)
创始人
2024-06-02 22:11:00
0

介绍

Gleam 是一个高效、可伸缩的分布式map/reduce系统,DAG(有向无环图)执行模式,数据可以纯内存或磁盘的方式进行流转。 可单点,也可以分布式部署 https://github.com/chrislusf/gleam

使用Gleam之前,我们需要先了解下map/reduce的概念(看源码之前先找一些相关资料了解其工作原理)

MapReduce讲的就是分而治之的程序处理理念,把一个复杂的任务划分为若干个简单的任务分别来做。map和reduce函数是要执行的任务,由master分配任务给worker执行。map函数读取被分配的输入数据片段,输出中间key/value pair值的集合,reduce函数收集具有相同中间key值的value值,合并这些value值,形成一个较小的value值的集合。

比如要做一个统计词频的工作流:

  1. 输入(input)为文件内容

Hello PHP

Hello Java

Hello C
Hello Java
Hello C++

  1. 拆分(split)把文件进行分片,交给worker,进行map处理,将上述文档中每一行的内容转换为key-value对,

(PHP ,1)

(Java, 1)

(C, 1)

(Java, 1)

(C++, 1)

3.分发(shuffle)将key相同的扔到一起去,即:

(PHP ,1)

(Java, 1, 1)

(C, 1)

(C++, 1)

  1. 归并(reduce),上一步的结果集分发给对应的reduce函数,对相同key的value进行遍历

(PHP ,1)

(Java, 2)

(C, 1)

(C++, 1)

\5. 之后就可以做一些排序、topN等等操作。

单机模式

数据量不是很大(数GB)的情况下,单机模式就可以满足需求,以下是单机模式的流程图

1

在gleam中,从数据源 …-> map …-> reduce …-> 输出,整个链中数据均通过管道io.Piper进行流转

  1. 数据源
    默认支持txt、csv、Tsv、Parquet、Orc、Sokect监听、Channel、Bytes、Strings、Ints、Slices 除此之外,框架对数据源的读取已经做了高度封装,只要实现一个接口就可以无缝融入到Flow流程当中

    type Sourcer interface {Generate(*Flow) *Dataset
    }
    
  2. map函数
    map函数是一个类型:type Mapper func([]interface{}) error , 所以自定义的map函数必须和该类型匹配,如

    var Token = gio.RegisterMapper(token)   //必须是全局变量。 Token为maperId,框架的Map函数是通过此ID进行定位用户自定义map函数的, token为自定义函数名//读取一行数据
    func token(row []interface{}) error {...//对每行数据进行切割,并设置key , value...//gio.Emit(key, value)//Emit 函数对key ,value编码后,写入了os.Stdout中, 刚才提到,Gleam中数据是通过io.Pipe进行流转的,所以此处的os.Stdout 不会输出到终端,//因为此处的os.Stdout 被重定向了return nil
    }
    

    IO重定向,位置:github.com\chrislusf\gleam\util\exec_util.go. 下图中的reader和writer 实际上分别是一个ioPipe()的实例 PipeReader和PipeWriter

    2

  3. reduce
    reduce 也函数是一个类型:type Reducer func(x, y interface{}) (interface{}, error) , 所以自定义的reduce 函数必须和该类型匹配,如

    var Sum = gio.RegisterReducer(sum)   //必须是全局变量。 Sum 为reduceId,框架的Reduce函数是通过此ID进行定位用户自定义reduce函数的, sum为自定义函数名//对相同key的元素进行遍历
    //x代表一个key的累计值, y代表该key的当前一个处理值
    //最终实现了对key的值进行求和
    func sum(x, y interface{}) (interface{}, error) {return gio.ToInt64(x) + gio.ToInt64(y), nil
    }
    

    注:当map后dataSet中没有重复key, 则后续的reduce函数不会执行

  4. 输出
    带缓冲的:Fprintf
    直接输出:Fprintlnf
    自定义输出:OutputRow ,此接口我们可以自定义一个回调函数,灵活实现个性化需求。

    除此之外也可以添加其他的处理节点,Gleam所提供的有:Sort 排序、取TopN、SelectKv、Select选择Key,从1开始、Partition 分区 等等

提示

官方示例中 “Word Count” 存在一个bug

3

正确写法:那么A, 要么B
A. ReduceByKey(“sum”, Sum). //按第一个key降序排序
B. ReduceBy(“sum”, Sum, flow.OrderBy(1, false)). //按第一个key降序排序

示例:

自定义Json数据源,计算学生的平均值,并取前三名

package mainimport ("github.com/chrislusf/gleam/gio""flag""github.com/chrislusf/gleam/distributed""encoding/json""github.com/chrislusf/gleam/flow""github.com/chrislusf/gleam/util""fmt""os""strconv")var (isDistributed   = flag.Bool("distributed", false, "run in distributed or not")verbose         = flag.Bool("verbose", false, "print out actual mapper and reducer function names")
)var (Token1 = gio.RegisterMapper(token1)Avg = gio.RegisterMapper(avg)Combine = gio.RegisterReducer(combine)//学科数目subjectsNum int64 = 2
)type Submit struct {Name string       `json:"name"`Score int             `json:"score"`Subject string     `json:"subject"`
}
type test []Submitfunc main() {if *verbose {gio.ListRegisteredFunctions()}gio.Init()json_str := `[{"name":"stu1","score": 21,"subject": "数学"},{"name":"stu2","score": 100,"subject": "数学"},{"name":"stu3","score": 50,"subject": "数学"},{"name":"stu4","score": 70,"subject": "数学"},{"name":"stu5","score": 80,"subject": "数学"},{"name":"stu5","score": 77,"subject": "语文"}]        
`subs := &test{}json.Unmarshal([]byte(json_str), subs)db := make([][]interface{}, len(*subs))for i, v := range *subs{t, _ := json.Marshal(v)db[i] = append(db[i], t)}f := flow.New("平均分为前三且不是stu2的学生").Slices(db).Map("tokenize", Token1).ReduceByKey("combine", Combine).Map("avg", Avg).Sort("按学生成绩倒序排序(中英文都可以)", flow.OrderBy(2, false)).Top("top3,用此函数可以省略排序如:Sort,SortByKey", 3, flow.OrderBy(2, false)).OutputRow(func(row *util.Row) error{decodedObjects := make([]interface{}, 0)decodedObjects = append(decodedObjects, row.K...)decodedObjects = append(decodedObjects, row.V...)fmt.Printf("output: %v ,%v\n", decodedObjects...)return nil})if *isDistributed {f.Run(distributed.Option())}else {f.Run()}
}//处理数据获取可用字段
func token1(row []interface{}) error{sub := &Submit{}b, _ := row[0].([]interface{})json.Unmarshal(b[0].([]byte), sub)//排除学生stu2if sub.Name == "stu2"{return nil}gio.Emit(sub.Name, []int64{gio.ToInt64(sub.Score)})return nil
}//计算平均值
func avg(row []interface{}) (error){var total int64if r, ok := row[1].([]interface{}); ok{for _, v := range r{if score, ok := v.(int64); ok {total += score}}}avg := Decimal(float64(total)/ float64(subjectsNum))//myPrint("avg: %v\n", avg)gio.Emit(row[0], avg)return nil
}//合并学科分数
func combine(x, y interface{}) (interface{}, error){switch t := x.(type) {case []interface{}:t = append(t, convertToSlice(y)...)//myPrint("t: %v\n", reflect.TypeOf(t))return t, nil}return x, nil
}func convertToSlice(v interface{}) ([]interface{}){switch t := v.(type) {case []interface{}:return tdefault:return nil}
}func Decimal(value float64) float64 {value, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", value), 64)return value
}//由于在map中 os.Stdout 被重定向了,所以用os.Stderr来打印数据
func myPrint(format string, v...interface{}){fmt.Fprintf(os.Stderr, format, v...)
}

总结

像Slices、Bytes、Channel 等,数据获取方式,可以很好的和golang的kafkaWorker结合进行流计算

相关内容

热门资讯

122.(leaflet篇)l... 听老人家说:多看美女会长寿 地图之家总目录(订阅之前建议先查看该博客) 文章末尾处提供保证可运行...
育碧GDC2018程序化大世界... 1.传统手动绘制森林的问题 采用手动绘制的方法的话,每次迭代地形都要手动再绘制森林。这...
育碧GDC2018程序化大世界... 1.传统手动绘制森林的问题 采用手动绘制的方法的话,每次迭代地形都要手动再绘制森林。这...
Vue使用pdf-lib为文件... 之前也写过两篇预览pdf的,但是没有加水印,这是链接:Vu...
PyQt5数据库开发1 4.1... 文章目录 前言 步骤/方法 1 使用windows身份登录 2 启用混合登录模式 3 允许远程连接服...
Android studio ... 解决 Android studio 出现“The emulator process for AVD ...
Linux基础命令大全(上) ♥️作者:小刘在C站 ♥️个人主页:小刘主页 ♥️每天分享云计算网络运维...
再谈解决“因为文件包含病毒或潜... 前面出了一篇博文专门来解决“因为文件包含病毒或潜在的垃圾软件”的问题,其中第二种方法有...
南京邮电大学通达学院2023c... 题目展示 一.问题描述 实验题目1 定义一个学生类,其中包括如下内容: (1)私有数据成员 ①年龄 ...
PageObject 六大原则 PageObject六大原则: 1.封装服务的方法 2.不要暴露页面的细节 3.通过r...
【Linux网络编程】01:S... Socket多进程 OVERVIEWSocket多进程1.Server2.Client3.bug&...
数据结构刷题(二十五):122... 1.122. 买卖股票的最佳时机 II思路:贪心。把利润分解为每天为单位的维度,然后收...
浏览器事件循环 事件循环 浏览器的进程模型 何为进程? 程序运行需要有它自己专属的内存空间࿰...
8个免费图片/照片压缩工具帮您... 继续查看一些最好的图像压缩工具,以提升用户体验和存储空间以及网站使用支持。 无数图像压...
计算机二级Python备考(2... 目录  一、选择题 1.在Python语言中: 2.知识点 二、基本操作题 1. j...
端电压 相电压 线电压 记得刚接触矢量控制的时候,拿到板子,就赶紧去测各种波形,结...
如何使用Python检测和识别... 车牌检测与识别技术用途广泛,可以用于道路系统、无票停车场、车辆门禁等。这项技术结合了计...
带环链表详解 目录 一、什么是环形链表 二、判断是否为环形链表 2.1 具体题目 2.2 具体思路 2.3 思路的...
【C语言进阶:刨根究底字符串函... 本节重点内容: 深入理解strcpy函数的使用学会strcpy函数的模拟实现⚡strc...
Django web开发(一)... 文章目录前端开发1.快速开发网站2.标签2.1 编码2.2 title2.3 标题2.4 div和s...