Qt中用thrift验证flume
创始人
2024-05-03 10:12:49
0

一.flume简介

flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。
在flume中分为了3个组件,分别为source,channel和sink。
Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据。
Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。
Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。
flume支持的Source、Sink和Channel如下所示
Flume sources
☆Avro Source(常用于agent之间传递消息)
☆Exec Source(tail -f 文件,会重复消费)
☆Spooling Directory Source(文件夹变更)
☆Taildir Source(实时更新一组文件,不会重复消费)
☆Kafka Source
☆HTTP Source
☆Thrift Source
☆JMS Source
☆NetCat TCP Source
☆NetCat UDP Source
☆Sequence Generator Source
☆Custom Source 自定义
Flume Sinks
☆Avro Sink
☆HDFS Sink
☆Hive Sink
☆Logger Sink (测试用)
☆File Roll Sink
☆Thrift Sink
☆IRC Sink
☆Null Sink
☆HBaseSinks
☆ElasticSearchSink
☆Kafka Sink
☆HTTP Sink
☆Custom Sink 自定义
Flume Channels
☆Memory Channel
☆JDBC Channel 当前Flume Channel内置支持Derby
☆Kafka Channel
☆File Channel
☆Spillable Memory Channel (当前试验性的,不建议生产环境使用)
☆Custom Channel 自定义
从这可以看出,Source和Sink都是支持Thrift的,可以理解为flume内置了Thrift服务器和客户端。

二.windows部署flume

下载最新的jdk 19  
安装完成后,将jdk添加到环境变量
①新建系统变量JAVA_HOME,值为D:\Program Files\Java\jdk-19

②将%JAVA_HOME%\bin添加到环境变量Path中
测试一下java是否配置好

下载flume-1.9.0,bin和src都要下载


将bin解压到D盘,然后将D:\flume-1.9.0\conf中的三个template文件复制一份并去掉".template",如下图所示

将jdk 19的安装路径添加到D:\flume-1.9.0\conf\flume-env.sh中,如下图所示

将D:\flume-1.9.0\conf\flume-conf.properties的内容修改为

agent.sources = r1  
agent.sinks = k1  
agent.channels = c1  # Describe/configure the source  
agent.sources.r1.type = thrift  
agent.sources.r1.port = 9090  
agent.sources.r1.bind = 0.0.0.0  
agent.sources.r1.threads = 10 # Use a channel which buffers events in file  
agent.channels.c1.type = memory  
agent.channels.c1.capacity = 10000000  
agent.channels.c1.transactionCapacity= 2000  # Describe the sink k1  
agent.sinks.k1.type = logger  # Bind the source and sink to the channel  
agent.sources.r1.channels = c1  
agent.sinks.k1.channel = c1

这里将Source设置为thrift,Sink为logger,这样就能接收thrift客户端发送的数据,并在flume控制台中打印出来。这里端口号是9090,那么thrift客户端的端口号也要设置为9090
将flume添加到环境变量
①新建系统变量FLUME_HOME,值为D:\flume-1.9.0

②将%FLUME_HOME%\bin和%FLUME_HOME%\conf添加到环境变量Path中
启动flume

flume-ng agent --conf D:\flume-1.9.0\conf --conf-file D:\flume-1.9.0\conf\flume-conf.properties --name agent -property flume.root.logger=INFO,console

会报如下错误
Test-Path : 路径中具有非法字符。
所在位置 D:\flume-1.9.0\bin\flume-ng.ps1:106 字符: 56
+ ...                               ? { "$_" -ne "" -and (Test-Path $_ )} |
+                                                         ~~~~~~~~~~~~
    + CategoryInfo          : InvalidArgument: (D:\Program File...7.0_80\jre\bin":String) [Test-Path],ArgumentExceptio
    n
    + FullyQualifiedErrorId : ItemExistsArgumentError,Microsoft.PowerShell.Commands.TestPathCommand
具体如下图所示


解决方法是修改D:\flume-1.9.0\bin\flume-ng.ps1文件
将GetHadoopHome、GetHbaseHome和GetHiveHome这三个函数的定义和调用全部注释掉或者删掉,因为系统中未安装这三个软件,肯定找不到
再次运行flume,成功。提示Started Thrift source

三.用thrift验证flume

解压flume src,在src\flume-ng-sdk\src\main\thrift中有一个flume.thrift文件,这个是thrift和flume通信的接口文件,将这个文件稍做修改,去掉枚举中的ERROR,它与windows的宏定义冲突。修改后的flume.thrift如下所示

namespace java org.apache.flume.thriftstruct ThriftFlumeEvent {1: required map  headers,2: required binary body,
}enum Status {OK,FAILED,UNKNOWN
}service ThriftSourceProtocol {Status append(1: ThriftFlumeEvent event),Status appendBatch(1: list events),
}

参考上篇博客Qt中调用thrift,执行

thrift -r --gen cpp flume.thrift

将上篇博客中的Thrift_Client稍作修改,如下所示

#include 
#include 
#include #include "ThriftSourceProtocol.h"
#include using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;class ThriftClient
{
public:ThriftClient(): socket(new TSocket("127.0.0.1", 9090)), transport(new TFramedTransport(socket)), protocol(new TCompactProtocol(transport)){pClient = new ThriftSourceProtocolClient(protocol);}~ThriftClient(){}void sendEvent(){std::string body("The first event");ThriftFlumeEvent event;event.__set_headers(headers);event.__set_body(body);if (!transport->isOpen()){transport->open();}Status::type res = pClient->append(event);if (res == Status::OK){std::cout<<"Send event: "<close();}void sendBatchEvent(){std::string body2("The second event");ThriftFlumeEvent event;event.__set_headers(headers);event.__set_body(body2);events.push_back(event);std::string body3("The third event");event.__set_headers(headers);event.__set_body(body3);events.push_back(event);if (!transport->isOpen()){transport->open();}Status::type res = pClient->appendBatch(events);if (res == Status::OK){for(auto event:events){std::cout<<"Send event: "<close();}private:// Thrift protocol needings...std::shared_ptr socket;std::shared_ptr transport;std::shared_ptr protocol;ThriftSourceProtocolClient *pClient;std::map headers;std::vector events;
};int main(int argc, char **argv)
{ThriftClient client;client.sendEvent();client.sendBatchEvent();return 0;
}

这里的端口设置为9090。发送event有两种方式,单条发送和批量发送。代码中发送了三条event,flume的控制台也打印三条,如下图所示

现在把flume的Sink也指定为thrift,端口号为9091

agent.sources = r1  
agent.sinks = k1  
agent.channels = c1  # Describe/configure the source  
agent.sources.r1.type = thrift  
agent.sources.r1.port = 9090  
agent.sources.r1.bind = 0.0.0.0  
agent.sources.r1.threads = 10 # Use a channel which buffers events in file  
agent.channels.c1.type = memory  
agent.channels.c1.capacity = 10000000  
agent.channels.c1.transactionCapacity= 2000  # Describe the sink k1  
agent.sinks.k1.type = thrift
agent.sinks.k1.hostname = 127.0.0.1
agent.sinks.r1.port = 9091  # Bind the source and sink to the channel  
agent.sources.r1.channels = c1  
agent.sinks.k1.channel = c1
将上篇博客中的Thrift_Server稍作修改,如下所示
#include "ThriftSourceProtocol.h"
#include 
#include 
#include 
#include 
#include using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;class ThriftSourceProtocolHandler : virtual public ThriftSourceProtocolIf {
public:ThriftSourceProtocolHandler() {// Your initialization goes here}Status::type append(const ThriftFlumeEvent& event) {// Your implementation goes herestd::cout< & events) {// Your implementation goes herefor(auto event:events){std::cout< handler(new ThriftSourceProtocolHandler());::std::shared_ptr processor(new ThriftSourceProtocolProcessor(handler));::std::shared_ptr serverTransport(new TServerSocket(port));::std::shared_ptr transportFactory(new TFramedTransportFactory());::std::shared_ptr protocolFactory(new TCompactProtocolFactory());TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);server.serve();return 0;
}

重新启动flume,然后启动Thrift_Server,最后启动Thrfit_Client,结果如下图所示

至此,验证通过!

原文链接:https://blog.csdn.net/caoshangpa/article/details/128502780

相关内容

热门资讯

安卓系统自带的网页,功能与特色... 你有没有发现,每次打开安卓手机,那熟悉的系统界面里总有一个默默无闻的小家伙——安卓系统自带的网页浏览...
美咖云系统安卓版,开启智能生活... 你有没有发现,最近手机上多了一个叫“美咖云系统安卓版”的小家伙?它就像一个魔法师,轻轻一点,就能让你...
安卓系统推荐最好的手机,盘点性... 你有没有想过,拥有一部性能卓越的手机,就像是拥有了移动的宝藏库?在这个信息爆炸的时代,一部好手机不仅...
安卓11系统能精简吗,释放潜能 你有没有发现,随着手机越来越智能,系统也越来越庞大?安卓11系统,这个最新的操作系统,是不是也让你觉...
安卓自动重启系统软件,揭秘原因... 手机突然自动重启,是不是感觉整个人都不好了?别急,今天就来和你聊聊这个让人头疼的安卓自动重启系统软件...
苹果手机x刷安卓系统,探索安卓... 你有没有想过,你的苹果手机X竟然也能刷上安卓系统?是的,你没听错,就是那个一直以来都和我们苹果手机X...
安卓系统智商低吗,智商低下的真... 你有没有想过,为什么安卓系统的智商总被调侃得好像有点低呢?是不是觉得它总是慢吞吞的,有时候还犯点小错...
安卓系统手机联系人,揭秘你的社... 你有没有发现,手机里的联系人列表就像是一个小小的社交圈呢?里面藏着我们的亲朋好友、工作伙伴,甚至还有...
安卓系统免费铃声下载,打造个性... 手机里那首老掉牙的铃声是不是让你觉得有点out了呢?别急,今天就来给你支个招,让你轻松给安卓手机换上...
安卓系统用哪个桌面好,打造个性... 你有没有发现,手机桌面可是我们每天都要面对的“脸面”呢?换一个好看的桌面,心情都能跟着好起来。那么,...
虚拟大师是安卓10系统,功能与... 你知道吗?最近在手机圈里,有个新玩意儿引起了不小的轰动,那就是虚拟大师!而且,更让人惊喜的是,这个虚...
安卓系统与苹果优缺点,系统优缺... 说到手机操作系统,安卓和苹果绝对是两大巨头,它们各有各的特色,就像两道不同的美味佳肴,让人难以抉择。...
安卓win双系统主板,融合与创... 你有没有想过,一台电脑如果既能流畅运行安卓系统,又能轻松驾驭Windows系统,那该有多爽啊?没错,...
安卓系统可精简软件,轻松提升手... 你有没有发现,手机里的安卓系统越来越庞大,软件也越装越多,有时候感觉手机就像个“大肚子”,不仅运行速...
安卓系统基于linux的代码,... 你有没有想过,那个陪伴你每天刷抖音、玩游戏、办公的安卓系统,其实背后有着一套复杂的基于Linux的代...
苹果和安卓的拍照系统,谁更胜一... 你有没有发现,现在手机拍照已经成为我们生活中不可或缺的一部分呢?无论是记录生活的点滴,还是捕捉美丽的...
苹果和安卓系统不同吗,系统差异... 你有没有想过,为什么你的手机里装的是苹果的iOS系统,而朋友的手机却是安卓系统呢?这两种系统,看似都...
安卓系统有多少级,揭秘其多级架... 你有没有想过,那个陪伴我们日常生活的安卓系统,它其实有着丰富的层级结构呢?没错,就是那个让我们的手机...
华为鸿蒙系统与安卓的,技术融合... 你知道吗?最近科技圈可是炸开了锅,华为鸿蒙系统与安卓的较量成为了大家热议的话题。这不,今天我就来给你...
什么安卓手机是苹果系统,搭载苹... 你有没有想过,为什么有些人宁愿花大价钱买苹果手机,而有些人却对安卓手机情有独钟呢?其实,这个问题背后...