Spark Streaming
创始人
2024-05-22 16:46:04
0

1. kafka

具体步骤:

  • 启动 zookeeper、kafka
  • Shell 方法测试 kafka producer、consumer 生产消费情况
  • 启动 hdfs、yarn
  • 提交 spark 任务消费 kafka 消息

1.1 启动 zk 和 kafka

[root@bogon bin]# cd /usr/local/src/zookeeper-3.4.14/bin
[root@bogon bin]# ./zkServer.sh start# 查看状态,也可以使用 ps -ef | grep zookeeper
[root@localhost bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/src/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: standalone# 启动 kafka
[root@localhost bin]# cd /usr/local/src/kafka_2.11-2.4.1/bin/
[root@localhost bin]# ./kafka-server-start.sh -daemon /usr/local/src/kafka_2.11-2.4.1/config/server.properties# 检查是否启动成功
[root@localhost bin]# jps
20817 QuorumPeerMain
24954 Kafka
26364 Jps

1.2 测试 kafka

1、创建 topic

[root@localhost bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic2
Created topic topic2.

2、查看 topic

# 查看 topic 分区
[root@localhost bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
topic2

3、启动生产者:

# 生产消息
[root@bogon bin]# ./kafka-console-producer.sh --broker-list localhost:9092 -topic topic2
>[2021-01-23 13:18:52,684] WARN [Producer clientId=console-producer] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
>hello world
>hello python
>hello spark

生产者启动成功后,会一直卡在那,等待输入消息,以上已输入三组信息,现在来启动一个消费者消费。

4、启动消费者:

# 接收消息
[root@bogon bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic topic2 --from-beginning
>hello world
>hello python
>hello spark

1.3 启动 hdfs 和 yarn

1、启动 hdfs

[root@bogon sbin]# cd /home/hj/app/hadoop-2.6.0-cdh5.7.0/sbin/
[root@bogon sbin]# ./start-dfs.sh 
[root@bogon sbin]# jps
8881 SecondaryNameNode
7379 DataNode
8664 NameNode
9261 Jps
1054 QuorumPeerMain

2、启动 yarn

[root@bogon sbin]# ./start-yarn.sh 
starting yarn daemons
starting resourcemanager, logging to /home/hj/app/hadoop-2.6.0-cdh5.7.0/logs/yarn-root-resourcemanager-bogon.out
localhost: /usr/bin/python: No module named virtualenvwrapper
localhost: virtualenvwrapper.sh: There was a problem running the initialization hooks.
localhost: 
localhost: If Python could not import the module virtualenvwrapper.hook_loader,
localhost: check that virtualenvwrapper has been installed for
localhost: VIRTUALENVWRAPPER_PYTHON=/usr/bin/python and that PATH is
localhost: set properly.
localhost: starting nodemanager, logging to /home/hj/app/hadoop-2.6.0-cdh5.7.0/logs/yarn-root-nodemanager-bogon.out[root@bogon sbin]# jps
12945 DataNode
13089 SecondaryNameNode
14065 Jps
13924 ResourceManager
12840 NameNode
14031 NodeManager

1.4 spark 消费 kafka 消息

1.4.1 配置依赖包

spark streaming 连接 kafka 需要依赖两个 jar 包:

  • spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar:其中 2.11 表示 scala 版本
  • spark-streaming-kafka-0-8_2.11-2.4.4.jar

否则无法连接 kafka,具体可参照官网:http://spark.apache.org/docs/2.4.4/streaming-kafka-integration.html

下载完毕后,将其拷贝到 spark/jars 目录下:

[root@bogon jars]# cd /home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/jars
[root@bogon jars]# cp ../../spark-streaming-kafka-0-8-assembly_2.11-2.4.7.jar .
[root@bogon jars]# cp ../../spark-streaming-kafka-0-8_2.11-2.4.4.jar .

1.4.2 编写 SparkStreaming 程序

# coding=utf-8import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtilsif __name__ == '__main__':if len(sys.argv) != 2:print("Usage: test_spark_kafka_streaming.py topic")sys.exit(-1)conf = SparkConf().setAppName("test_kafka_streaming").setMaster("local[2]")sc = SparkContext(conf=conf)ssc = StreamingContext(sc, 10)	# 流处理 时间 10s brokers = "localhost:9092"topic = sys.argv[-1]kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})lines_rdd = kvs.map(lambda x: x[1])counts = lines_rdd.flatMap(lambda x: x.split(" ")). \map(lambda x: (x, 1)). \reduceByKey(lambda x, y: x + y)counts.pprint()ssc.start()ssc.awaitTermination()

该程序是一个简单的 wordcount 程序, 接收一个参数 topic,为 kafka topic10 秒钟处理一次 kafka 的消息。

kafka stream 两种模式

spark streamingkafka 接收数据,有两种方式

  • 使用 Direct API,这是更底层的 kafka API

  • 使用 receivers方式,这是更为高层次的 API

brokers = "localhost:9092"
topic = sys.argv[-1]# receiver 模式
line = KafkaUtils.createStream(ssc, brokers, 'test', {topic: 1})# no receiver 模式
line = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

两种方式各有利弊,有关更多详解,请参考:

  • https://blog.csdn.net/pysense/article/details/104179736/
  • https://www.cnblogs.com/heml/p/6796414.html

1.4.3 提交 spark 任务

因为我之前就已经开启了 kafka producer,如果你还没开启可以参照 1.2 测试 kafka,现在将 test_spark_kafka_streaming.py 提交到 yarn 上去执行:

[root@bogon bin]# ./spark-submit --master local[2] --name test_kafka_streaming /home/hj/app/test_spark_kafka_streaming.py topic2

1、kafka produce

[root@bogon bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic topic2
>hello spark hello python hello world hello alina

2、yarn 日志:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RYpQSd6c-1675346664787)(https://hubery624.oss-cn-shenzhen.aliyuncs.com/ddb6cfe195cc8cd30d52496f8a4eecf.png)]

1.5 参考文章

  • Spark2.1.0+入门:Apache Kafka作为DStream数据源(Python版)
  • pyspark streaming简介 和 消费 kafka示例
  • 基于PySpark整合Spark Streaming与Kafka

2. socket 字节流

1、test_spark_steaming_socket.py

# coding=utf-8from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContextif __name__ == '__main__':conf = SparkConf().setAppName("test_socket_streaming").setMaster("local[2]")sc = SparkContext(conf=conf)ssc = StreamingContext(sc, 10)lines_rdd = ssc.socketTextStream("localhost", 9999)counts = lines_rdd.flatMap(lambda x: x.split(" ")). \map(lambda x: (x, 1)). \reduceByKey(lambda x, y: x + y)counts.pprint()ssc.start()ssc.awaitTermination()

2、安装并启动 nc

[root@bogon app]# yum install nc[root@bogon app]# nc -lk 9999
hello world
hello spark
hello python
hello spark hello python hello world

3、提交 spark 任务:

[root@bogon bin]# ./spark-submit --master local[2] --name test_socket_streaming /home/hj/app/test_spark_steaming_socket.py 

注意:一定要先启动 nc,否则 spark 报错!

3. 文件流

1、test_spark_streaming_file.py

# coding=utf-8import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContextif __name__ == '__main__':if len(sys.argv) != 2:print("Usage: test_spark_streaming_file.py file_path")sys.exit(-1)conf = SparkConf().setAppName("test_socket_streaming").setMaster("local[2]")sc = SparkContext(conf=conf)ssc = StreamingContext(sc, 10)lines_rdd = ssc.textFileStream(sys.argv[-1])counts = lines_rdd.flatMap(lambda x: x.split(" ")). \map(lambda x: (x, 1)). \reduceByKey(lambda x, y: x + y)counts.pprint()ssc.start()ssc.awaitTermination()

2、提交 spark 任务:

# 监听 /home/hj/app/logfile/ 这个目录,在此之前先创建 logfile 目录
[root@bogon bin]# ./spark-submit --master local[2] --name test_file_streaming /home/hj/app/test_spark_streaming_file.py /home/hj/app/logfile/

3、创建文件并输入以下数据:

[root@bogon app]# mkdir logfile
[root@bogon app]# cd logfile/
[root@bogon logfile]# vim test
I love Hadoop
I love Spark
Spark is fast

注意:文件流只会监听 spark 任务已经启动后创建的文件,对于已经创建的文件是不能监听到的!

访问:http://192.168.131.131:4040/jobs/ 可查看任务详细:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yoCmAmby-1675346664788)(https://hubery624.oss-cn-shenzhen.aliyuncs.com/20210131194416.png)]

4. hbase 伪分布式安装

1、下载:http://archive.apache.org/dist/hbase/1.1.2/

2、解压并配置环境变量:

tar -zxvf hbase-1.1.2-bin.tar.gz -C /home/hj/app/[root@bogon app]# vim ~/.bash_profile
export HBASE_HOME=/home/hj/app/hbase-1.1.2
PATH=$PATH:$JAVE_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HBASE_HOME/bin# 使其生效
[root@bogon app]# source ~/.bash_profile

3、配置 hbase-env.sh

# HBASE_CLASSPATH设置为本机Hadoop安装目录下的 conf目录(即 /home/hj/app/hadoop-2.6.0-cdh5.7.0/conf)export JAVA_HOME=/home/hj/app/jdk1.8.0_261
export HBASE_CLASSPATH=/home/hj/app/hadoop-2.6.0-cdh5.7.0/conf
export HBASE_MANAGES_ZK=true

4、配置 conf/hbase-site.xml

hbase.rootdir# hdfs://localhost:9000/hbase  错误hdfs://localhost/hbasehbase.cluster.distributedtrue

  • 修改 hbase.rootdir,指定 HBase 数据在 HDFS 上的存储路径;将属性hbase.cluter.distributed 设置为 true。假设当前Hadoop集群运行在伪分布式模式下,在本机上运行,且 NameNode 运行在 9000 端口。
  • hbase.rootdir 指定HBase的存储目录; 设置集群处于分布式模式.

5、运行 hadoop

./home/hj/app/hadoop-2.6.0-cdh5.7.0/sbin/start-dfs.sh 

6、运行 hbase

[root@bogon hbase-1.1.2]# ./bin/start-hbase.sh
localhost: /usr/bin/python: No module named virtualenvwrapper
localhost: virtualenvwrapper.sh: There was a problem running the initialization hooks.
localhost: 
localhost: If Python could not import the module virtualenvwrapper.hook_loader,
localhost: check that virtualenvwrapper has been installed for
localhost: VIRTUALENVWRAPPER_PYTHON=/usr/bin/python and that PATH is
localhost: set properly.
localhost: starting zookeeper, logging to /home/hj/app/hbase-1.1.2/bin/../logs/hbase-root-zookeeper-bogon.out
starting master, logging to /home/hj/app/hbase-1.1.2/logs/hbase-root-master-bogon.out
starting regionserver, logging to /home/hj/app/hbase-1.1.2/logs/hbase-root-1-regionserver-bogon.out# 查看启动的进程,主要是 HRegionServer 和 HMaster
[root@bogon hbase-1.1.2]# jps
5586 NodeManager
5045 SecondaryNameNode
112135 Kafka
126087 HRegionServer
125977 HMaster
126376 Jps
1082 QuorumPeerMain
4730 NameNode
5468 ResourceManager
4847 DataNode

4.1 踩坑

4.1.1 启动没有 HMaster 进程

启动后没有 HMaster 进程,查看 hbase-1.1.2/bin/../logs/hbase-root-zookeeper-bogon.out 日志:

java.net.BindException: 地址已在使用at sun.nio.ch.Net.bind0(Native Method)at sun.nio.ch.Net.bind(Net.java:444)at sun.nio.ch.Net.bind(Net.java:436)at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:225)at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)at org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:111)at org.apache.hadoop.hbase.zookeeper.HQuorumPeer.runZKServer(HQuorumPeer.java:94)at org.apache.hadoop.hbase.zookeeper.HQuorumPeer.main(HQuorumPeer.java:79)

其实是因为 hbase 启动时也会启动 zookeeper,但是在此之前我已经启动了 zookeeper,导致地址被占用。

解决方法

  • 停止 zookeeper 服务,启动 hbase 时自动启动 zookeeper 服务
  • 设置 hbase-env.sh 配置文件,将 HBASE_MANAGES_ZK 设置为 false,禁用 zookeeper 启动,通过手动启动
[root@bogon conf]# vim hbase-env.sh 
export JAVA_HOME=/home/hj/app/jdk1.8.0_261
export HBASE_CLASSPATH=/home/hj/app/hadoop-2.6.0-cdh5.7.0/conf
export HBASE_MANAGES_ZK=false

4.1.2 启动时报:regionserver running as process 16996. Stop it first.

具体错误详情:

[root@bogon bin]# sh start-hbase.sh
starting master, logging to /home/hj/app/hbase-1.1.2/logs/hbase-root-master-bogon.out
regionserver running as process 16996. Stop it first.

解决方法

killregionserver,再启动 hbasekill -9 16996


4.1.3 启动报 SLF4J: Class path contains multiple SLF4J bindings

原因:hbasehadoopjar 包名冲突,修改 hbase 的 jar 包名:

mv slf4j-logj12-1.7.25.jar slf4j-logj12-1.7.25.jar-copy

参考文章:https://blog.csdn.net/qq_45135120/article/details/107048944

4.1.4 启动报 Java HotSpot™ 64-Bit Server VM warning: ignoring option PermSize=128m;

参考文章:https://blog.csdn.net/tiankong_12345/article/details/93585463

4.1.5 hbase shell 操作时报 Can’t get master address from ZooKeeper; znode data == null

问题:HMaster 进程掉了,没起来:

解决办法:http://blog.51yip.com/hadoop/2193.html

hbase 操作

[root@bogon hbase-1.1.2]# ./bin/hbase shell
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hj/app/hbase-1.1.2/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hj/app/hadoop-2.6.0-cdh5.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2021-01-17 11:53:03,530 WARN  [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
HBase Shell; enter 'help' for list of supported commands.
Type "exit" to leave the HBase Shell
Version 1.1.2, rcc2b70cf03e3378800661ec5cab11eb43fafe0fc, Wed Aug 26 20:11:27 PDT 2015
[root@bogon bin]# ./hbase shell
2021-02-06 21:54:05,371 WARN  [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
HBase Shell; enter 'help' for list of supported commands.
Type "exit" to leave the HBase Shell
Version 1.1.2, rcc2b70cf03e3378800661ec5cab11eb43fafe0fc, Wed Aug 26 20:11:27 PDT 2015hbase(main):001:0> list
TABLE                                                                                                                                
0 row(s) in 0.7260 seconds=> []hbase(main):005:0> create 'student', 'info'
0 row(s) in 2.4760 seconds=> Hbase::Table - student
hbase(main):006:0> describe 'student'
Table student is ENABLED                                                                                            
student                                                                                                             
COLUMN FAMILIES DESCRIPTION                                                                                         
{NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BL
OCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZ
E => '65536', REPLICATION_SCOPE => '0'}                                                                             
1 row(s) in 0.3350 secondshbase(main):002:0> put 'student','1','info:name','Xueqian'
0 row(s) in 0.5210 secondshbase(main):003:0> put 'student','1','info:gender','F'
0 row(s) in 0.0340 secondshbase(main):004:0> put 'student','1','info:age','23'
0 row(s) in 0.0490 secondshbase(main):005:0> put 'student','2','info:name','Weiliang'
0 row(s) in 0.0480 secondshbase(main):006:0> put 'student','2','info:gender','M'
0 row(s) in 0.0570 secondshbase(main):007:0> put 'student','2','info:age','24'
0 row(s) in 0.0270 secondshbase(main):008:0> get 'student', '1'
COLUMN                             CELL                                                                                              info:age                          timestamp=1612620059368, value=23                                                                 info:gender                       timestamp=1612620052093, value=F                                                                  info:name                         timestamp=1612620042398, value=Xueqian                                                            
3 row(s) in 0.1330 secondshbase(main):010:0> scan 'student'
ROW                                COLUMN+CELL                                                                                       1                                 column=info:age, timestamp=1612620059368, value=23                                                1                                 column=info:gender, timestamp=1612620052093, value=F                                              1                                 column=info:name, timestamp=1612620042398, value=Xueqian                                          2                                 column=info:age, timestamp=1612620095417, value=24                                                2                                 column=info:gender, timestamp=1612620086286, value=M                                              2                                 column=info:name, timestamp=1612620076564, value=Weiliang                                         
2 row(s) in 0.0870 seconds
export SPARK_DIST_CLASSPATH=$(/home/hj/app/hadoop-2.6.0-cdh5.7.0/bin/hadoop classpath):$(/home/hj/app/hbase-1.1.2/bin/hbase classpath):/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/jars/hbase/*

4.2 pyspark 读写 hbase

4.2.1 配置 spark

spark 想要读写 hbase,需要将 hbase 中相关jar 包拷贝到 spark/jars 中,需要拷贝的 jar 包有:所有hbase开头的jar文件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar

1、spark 安装目录 jars 中,新建 hbase 目录

[root@bogon spark-2.2.0-bin-2.6.0-cdh5.7.0]# cd jars
[root@bogon jars]# pwd
/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/jarsmkdir hbase
cp /home/hj/app/hbase-1.1.2/lib/hbase*.jar hbase/
cp /home/hj/app/hbase-1.1.2/lib/guava-12.0.1.jar  hbase/
cp /home/hj/app/hbase-1.1.2/lib/htrace-core-3.1.0-incubating.jar  hbase/
cp /home/hj/app/hbase-1.1.2/lib/protobuf-java-2.5.0.jar  hbase/

2、Spark 2.0 版本上缺少相关把 hbase 的数据转换 python 可读取的 jar 包,需要另行下载 spark-example-1.6.0.jar,下载后再将其包括到 hbase/ 中:

mv ~/下载/spark-examples* hbase/

4.2.2 读写 hbase

1、打开 pyspark shell 终端:

# 切换到 spark/bin 目录,启动 pyspark shell 终端
[root@bogon spark-2.2.0-bin-2.6.0-cdh5.7.0]# cd bin/
[root@bogon bin]# pwd
/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/bin
[root@bogon bin]# ls
beeline          load-spark-env.cmd  pyspark2.cmd     spark-class       sparkR2.cmd       spark-shell.cmd    spark-submit.cmd
beeline.cmd      load-spark-env.sh   pyspark.cmd      spark-class2.cmd  sparkR.cmd        spark-sql
derby.log        metastore_db        run-example      spark-class.cmd   spark-shell       spark-submit
find-spark-home  pyspark             run-example.cmd  sparkR            spark-shell2.cmd  spark-submit2.cmd
[root@bogon bin]# ./pyspark

2、读取之前 hbase 中数据

hbase(main):002:0> scan 'student'
ROW                                COLUMN+CELL                                                                                       1                                 column=info:age, timestamp=1612620059368, value=23                                                1                                 column=info:gender, timestamp=1612620052093, value=F                                              1                                 column=info:name, timestamp=1612620042398, value=Xueqian                                          2                                 column=info:age, timestamp=1612620095417, value=24                                                2                                 column=info:gender, timestamp=1612620086286, value=M                                              2                                 column=info:name, timestamp=1612620076564, value=Weiliang                                         
2 row(s) in 3.3040 seconds

3、读取:

>>> host = 'localhost'
>>> table = 'student'
>>> conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
>>> keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
>>> valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
>>> hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
>>> count = hbase_rdd.count()
>>> hbase_rdd.cache()
MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:208
>>> output = hbase_rdd.collect()
>>> for (k, v) in output:
...     print(k, v)
... 
(u'1', u'{"qualifier" : "age", "timestamp" : "1612620059368", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "23"}\n{"qualifier" : "gender", "timestamp" : "1612620052093", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "F"}\n{"qualifier" : "name", "timestamp" : "1612620042398", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "Xueqian"}')
(u'2', u'{"qualifier" : "age", "timestamp" : "1612620095417", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "24"}\n{"qualifier" : "gender", "timestamp" : "1612620086286", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "M"}\n{"qualifier" : "name", "timestamp" : "1612620076564", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "Weiliang"}')
>>> count
2

4、写入数据:

>>> k1_conv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
>>> v1_conv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
>>> conf_1 = {"hbase.zookeeper.quorum": host,"hbase.mapred.outputtable": table,"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
>>> rawData = ['3,info,name,Rongcheng','4,info,name,Guanhua']
>>> sc.parallelize(rawData).map(lambda x: (x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf_1,keyConverter=k1_conv,valueConverter=v1_conv)
21/02/09 22:31:50 ERROR io.SparkHadoopMapReduceWriter: Aborting job job_20210209223147_0008.
java.lang.IllegalArgumentException: Can not create a Path from a null stringat org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)at org.apache.hadoop.fs.Path.(Path.java:135)at org.apache.hadoop.fs.Path.(Path.java:89)at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:132)at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:101)at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)at org.apache.spark.api.python.PythonRDD$.saveAsHadoopDataset(PythonRDD.scala:861)at org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset(PythonRDD.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at py4j.Gateway.invoke(Gateway.java:280)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:214)at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):File "", line 1, in File "/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/python/pyspark/rdd.py", line 1393, in saveAsNewAPIHadoopDatasetkeyConverter, valueConverter, True)File "/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__File "/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/python/pyspark/sql/utils.py", line 79, in decoraise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: u'Can not create a Path from a null string'

查看是否写入成功:

# 发现多了两条新数据,写入成功
hbase(main):003:0> scan 'student'
ROW                                COLUMN+CELL                                                                                       1                                 column=info:age, timestamp=1612620059368, value=23                                                1                                 column=info:gender, timestamp=1612620052093, value=F                                              1                                 column=info:name, timestamp=1612620042398, value=Xueqian                                          2                                 column=info:age, timestamp=1612620095417, value=24                                                2                                 column=info:gender, timestamp=1612620086286, value=M                                              2                                 column=info:name, timestamp=1612620076564, value=Weiliang                                         3                                 column=info:name, timestamp=1612881109880, value=Rongcheng                                        4                                 column=info:name, timestamp=1612881109879, value=Guanhua                                          
4 row(s) in 0.1310 seconds

注意:写入数据会报错 pyspark.sql.utils.IllegalArgumentException: u'Can not create a Path from a null string',但是仍会写入成功!

相关内容

热门资讯

华为手机适合安卓系统,安卓生态... 你有没有发现,最近华为手机在安卓系统圈子里可是风头无两呢?这不,我就来给你好好捋一捋,为什么华为手机...
安卓系统下载福建助学,安卓系统... 你有没有听说最近安卓系统上有个超级棒的福建助学项目?没错,就是那个能让你轻松下载各种学习资源的神器!...
i7安卓系统,引领智能科技新潮... 你有没有想过,手机和电脑的结合体是什么样的呢?想象一个既能流畅运行大型游戏,又能轻松处理日常办公的设...
安卓改鸿蒙系统安装,系统升级安... 你有没有想过给你的安卓手机换换口味呢?没错,就是那种焕然一新的感觉!今天,就让我来带你一起探索如何将...
安卓原生系统美化软件,个性化美... 你有没有发现,安卓手机用久了,界面总是有点单调乏味呢?别急,今天就来给你安利几款超好用的安卓原生系统...
安卓系统图案解锁方法,安卓系统... 手机解锁,这可是每天都要经历的小环节,是不是觉得有点儿单调呢?今天,就让我来带你一起探索一下安卓系统...
安卓系统怎么调俄语,安卓系统设... 你有没有想过,在安卓手机上轻松切换到俄语界面呢?这可不是什么高难度的任务,只要跟着我一步步来,保证让...
安卓系统怎么配置内网,安卓系统... 你有没有想过,家里的安卓设备怎么才能轻松连接到内网呢?这可是个实用的小技巧哦!想象你可以在手机上直接...
安卓系统更新 文件路径,安卓系... 你有没有发现,你的安卓手机最近是不是总在提醒你更新系统呢?每次更新,都感觉手机焕然一新,功能更强大了...
wish只能用安卓系统,探索无... 你知道吗?在手机世界里,有一个神奇的愿望清单,只有安卓系统的小伙伴们才能实现哦! 今天,就让我带你一...
开元安卓车机系统,智能驾驶新体... 你有没有发现,现在的汽车越来越智能了?这不,最近我入手了一辆配备了开元安卓车机系统的车,简直让我爱不...
安卓系统旁白怎么关,如何关闭安... 你是不是也和我一样,在使用安卓手机的时候,不小心开启了旁白功能,现在想把它关掉,却怎么也找不到方法?...
安卓手机系统流畅版,极致性能与... 你有没有发现,最近你的安卓手机用起来是不是特别顺滑?没错,就是那种点屏幕就立刻响应的感觉,简直让人爱...
forest安卓系统换到苹果,... 你有没有想过,手机操作系统就像是我们生活中的不同道路,有时候,你可能觉得一条路走得太久了,想要换一条...
华为鸿蒙系统安卓平板,开启智能... 亲爱的读者们,你是否也像我一样,对科技圈的新鲜事儿充满好奇?今天,我要和你聊聊一个最近在科技圈掀起波...
安卓系统藏族软件下载,精选安卓... 安卓系统藏族软件下载:探索藏族文化的数字新篇章在数字化时代,手机已经成为我们生活中不可或缺的一部分。...
显示安卓系统耗电大,深度剖析原... 手机电量总是不够用?是不是觉得安卓系统耗电特别大?别急,今天就来给你揭秘安卓系统耗电的秘密,让你手机...
抽取原装安卓系统驱动,深度挖掘... 你有没有遇到过这种情况?手机里的安卓系统突然卡顿,或者某个应用突然罢工,这时候你是不是想给它来个“大...
安卓系统手机游戏排行,热门游戏... 你有没有发现,最近你的手机里是不是又多了一款游戏?没错,安卓系统手机游戏排行又更新了!今天,就让我带...
安卓系统叫AR 特效,安卓系统... 你知道吗?最近在安卓系统上出现了一个超级酷炫的新功能,它就是AR特效!是不是听起来就让人兴奋不已?那...