Spark Streaming
创始人
2024-05-22 16:46:04
0

1. kafka

具体步骤:

  • 启动 zookeeper、kafka
  • Shell 方法测试 kafka producer、consumer 生产消费情况
  • 启动 hdfs、yarn
  • 提交 spark 任务消费 kafka 消息

1.1 启动 zk 和 kafka

[root@bogon bin]# cd /usr/local/src/zookeeper-3.4.14/bin
[root@bogon bin]# ./zkServer.sh start# 查看状态,也可以使用 ps -ef | grep zookeeper
[root@localhost bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/src/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: standalone# 启动 kafka
[root@localhost bin]# cd /usr/local/src/kafka_2.11-2.4.1/bin/
[root@localhost bin]# ./kafka-server-start.sh -daemon /usr/local/src/kafka_2.11-2.4.1/config/server.properties# 检查是否启动成功
[root@localhost bin]# jps
20817 QuorumPeerMain
24954 Kafka
26364 Jps

1.2 测试 kafka

1、创建 topic

[root@localhost bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic2
Created topic topic2.

2、查看 topic

# 查看 topic 分区
[root@localhost bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
topic2

3、启动生产者:

# 生产消息
[root@bogon bin]# ./kafka-console-producer.sh --broker-list localhost:9092 -topic topic2
>[2021-01-23 13:18:52,684] WARN [Producer clientId=console-producer] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
>hello world
>hello python
>hello spark

生产者启动成功后,会一直卡在那,等待输入消息,以上已输入三组信息,现在来启动一个消费者消费。

4、启动消费者:

# 接收消息
[root@bogon bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic topic2 --from-beginning
>hello world
>hello python
>hello spark

1.3 启动 hdfs 和 yarn

1、启动 hdfs

[root@bogon sbin]# cd /home/hj/app/hadoop-2.6.0-cdh5.7.0/sbin/
[root@bogon sbin]# ./start-dfs.sh 
[root@bogon sbin]# jps
8881 SecondaryNameNode
7379 DataNode
8664 NameNode
9261 Jps
1054 QuorumPeerMain

2、启动 yarn

[root@bogon sbin]# ./start-yarn.sh 
starting yarn daemons
starting resourcemanager, logging to /home/hj/app/hadoop-2.6.0-cdh5.7.0/logs/yarn-root-resourcemanager-bogon.out
localhost: /usr/bin/python: No module named virtualenvwrapper
localhost: virtualenvwrapper.sh: There was a problem running the initialization hooks.
localhost: 
localhost: If Python could not import the module virtualenvwrapper.hook_loader,
localhost: check that virtualenvwrapper has been installed for
localhost: VIRTUALENVWRAPPER_PYTHON=/usr/bin/python and that PATH is
localhost: set properly.
localhost: starting nodemanager, logging to /home/hj/app/hadoop-2.6.0-cdh5.7.0/logs/yarn-root-nodemanager-bogon.out[root@bogon sbin]# jps
12945 DataNode
13089 SecondaryNameNode
14065 Jps
13924 ResourceManager
12840 NameNode
14031 NodeManager

1.4 spark 消费 kafka 消息

1.4.1 配置依赖包

spark streaming 连接 kafka 需要依赖两个 jar 包:

  • spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar:其中 2.11 表示 scala 版本
  • spark-streaming-kafka-0-8_2.11-2.4.4.jar

否则无法连接 kafka,具体可参照官网:http://spark.apache.org/docs/2.4.4/streaming-kafka-integration.html

下载完毕后,将其拷贝到 spark/jars 目录下:

[root@bogon jars]# cd /home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/jars
[root@bogon jars]# cp ../../spark-streaming-kafka-0-8-assembly_2.11-2.4.7.jar .
[root@bogon jars]# cp ../../spark-streaming-kafka-0-8_2.11-2.4.4.jar .

1.4.2 编写 SparkStreaming 程序

# coding=utf-8import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtilsif __name__ == '__main__':if len(sys.argv) != 2:print("Usage: test_spark_kafka_streaming.py topic")sys.exit(-1)conf = SparkConf().setAppName("test_kafka_streaming").setMaster("local[2]")sc = SparkContext(conf=conf)ssc = StreamingContext(sc, 10)	# 流处理 时间 10s brokers = "localhost:9092"topic = sys.argv[-1]kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})lines_rdd = kvs.map(lambda x: x[1])counts = lines_rdd.flatMap(lambda x: x.split(" ")). \map(lambda x: (x, 1)). \reduceByKey(lambda x, y: x + y)counts.pprint()ssc.start()ssc.awaitTermination()

该程序是一个简单的 wordcount 程序, 接收一个参数 topic,为 kafka topic10 秒钟处理一次 kafka 的消息。

kafka stream 两种模式

spark streamingkafka 接收数据,有两种方式

  • 使用 Direct API,这是更底层的 kafka API

  • 使用 receivers方式,这是更为高层次的 API

brokers = "localhost:9092"
topic = sys.argv[-1]# receiver 模式
line = KafkaUtils.createStream(ssc, brokers, 'test', {topic: 1})# no receiver 模式
line = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

两种方式各有利弊,有关更多详解,请参考:

  • https://blog.csdn.net/pysense/article/details/104179736/
  • https://www.cnblogs.com/heml/p/6796414.html

1.4.3 提交 spark 任务

因为我之前就已经开启了 kafka producer,如果你还没开启可以参照 1.2 测试 kafka,现在将 test_spark_kafka_streaming.py 提交到 yarn 上去执行:

[root@bogon bin]# ./spark-submit --master local[2] --name test_kafka_streaming /home/hj/app/test_spark_kafka_streaming.py topic2

1、kafka produce

[root@bogon bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic topic2
>hello spark hello python hello world hello alina

2、yarn 日志:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RYpQSd6c-1675346664787)(https://hubery624.oss-cn-shenzhen.aliyuncs.com/ddb6cfe195cc8cd30d52496f8a4eecf.png)]

1.5 参考文章

  • Spark2.1.0+入门:Apache Kafka作为DStream数据源(Python版)
  • pyspark streaming简介 和 消费 kafka示例
  • 基于PySpark整合Spark Streaming与Kafka

2. socket 字节流

1、test_spark_steaming_socket.py

# coding=utf-8from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContextif __name__ == '__main__':conf = SparkConf().setAppName("test_socket_streaming").setMaster("local[2]")sc = SparkContext(conf=conf)ssc = StreamingContext(sc, 10)lines_rdd = ssc.socketTextStream("localhost", 9999)counts = lines_rdd.flatMap(lambda x: x.split(" ")). \map(lambda x: (x, 1)). \reduceByKey(lambda x, y: x + y)counts.pprint()ssc.start()ssc.awaitTermination()

2、安装并启动 nc

[root@bogon app]# yum install nc[root@bogon app]# nc -lk 9999
hello world
hello spark
hello python
hello spark hello python hello world

3、提交 spark 任务:

[root@bogon bin]# ./spark-submit --master local[2] --name test_socket_streaming /home/hj/app/test_spark_steaming_socket.py 

注意:一定要先启动 nc,否则 spark 报错!

3. 文件流

1、test_spark_streaming_file.py

# coding=utf-8import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContextif __name__ == '__main__':if len(sys.argv) != 2:print("Usage: test_spark_streaming_file.py file_path")sys.exit(-1)conf = SparkConf().setAppName("test_socket_streaming").setMaster("local[2]")sc = SparkContext(conf=conf)ssc = StreamingContext(sc, 10)lines_rdd = ssc.textFileStream(sys.argv[-1])counts = lines_rdd.flatMap(lambda x: x.split(" ")). \map(lambda x: (x, 1)). \reduceByKey(lambda x, y: x + y)counts.pprint()ssc.start()ssc.awaitTermination()

2、提交 spark 任务:

# 监听 /home/hj/app/logfile/ 这个目录,在此之前先创建 logfile 目录
[root@bogon bin]# ./spark-submit --master local[2] --name test_file_streaming /home/hj/app/test_spark_streaming_file.py /home/hj/app/logfile/

3、创建文件并输入以下数据:

[root@bogon app]# mkdir logfile
[root@bogon app]# cd logfile/
[root@bogon logfile]# vim test
I love Hadoop
I love Spark
Spark is fast

注意:文件流只会监听 spark 任务已经启动后创建的文件,对于已经创建的文件是不能监听到的!

访问:http://192.168.131.131:4040/jobs/ 可查看任务详细:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yoCmAmby-1675346664788)(https://hubery624.oss-cn-shenzhen.aliyuncs.com/20210131194416.png)]

4. hbase 伪分布式安装

1、下载:http://archive.apache.org/dist/hbase/1.1.2/

2、解压并配置环境变量:

tar -zxvf hbase-1.1.2-bin.tar.gz -C /home/hj/app/[root@bogon app]# vim ~/.bash_profile
export HBASE_HOME=/home/hj/app/hbase-1.1.2
PATH=$PATH:$JAVE_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HBASE_HOME/bin# 使其生效
[root@bogon app]# source ~/.bash_profile

3、配置 hbase-env.sh

# HBASE_CLASSPATH设置为本机Hadoop安装目录下的 conf目录(即 /home/hj/app/hadoop-2.6.0-cdh5.7.0/conf)export JAVA_HOME=/home/hj/app/jdk1.8.0_261
export HBASE_CLASSPATH=/home/hj/app/hadoop-2.6.0-cdh5.7.0/conf
export HBASE_MANAGES_ZK=true

4、配置 conf/hbase-site.xml

hbase.rootdir# hdfs://localhost:9000/hbase  错误hdfs://localhost/hbasehbase.cluster.distributedtrue

  • 修改 hbase.rootdir,指定 HBase 数据在 HDFS 上的存储路径;将属性hbase.cluter.distributed 设置为 true。假设当前Hadoop集群运行在伪分布式模式下,在本机上运行,且 NameNode 运行在 9000 端口。
  • hbase.rootdir 指定HBase的存储目录; 设置集群处于分布式模式.

5、运行 hadoop

./home/hj/app/hadoop-2.6.0-cdh5.7.0/sbin/start-dfs.sh 

6、运行 hbase

[root@bogon hbase-1.1.2]# ./bin/start-hbase.sh
localhost: /usr/bin/python: No module named virtualenvwrapper
localhost: virtualenvwrapper.sh: There was a problem running the initialization hooks.
localhost: 
localhost: If Python could not import the module virtualenvwrapper.hook_loader,
localhost: check that virtualenvwrapper has been installed for
localhost: VIRTUALENVWRAPPER_PYTHON=/usr/bin/python and that PATH is
localhost: set properly.
localhost: starting zookeeper, logging to /home/hj/app/hbase-1.1.2/bin/../logs/hbase-root-zookeeper-bogon.out
starting master, logging to /home/hj/app/hbase-1.1.2/logs/hbase-root-master-bogon.out
starting regionserver, logging to /home/hj/app/hbase-1.1.2/logs/hbase-root-1-regionserver-bogon.out# 查看启动的进程,主要是 HRegionServer 和 HMaster
[root@bogon hbase-1.1.2]# jps
5586 NodeManager
5045 SecondaryNameNode
112135 Kafka
126087 HRegionServer
125977 HMaster
126376 Jps
1082 QuorumPeerMain
4730 NameNode
5468 ResourceManager
4847 DataNode

4.1 踩坑

4.1.1 启动没有 HMaster 进程

启动后没有 HMaster 进程,查看 hbase-1.1.2/bin/../logs/hbase-root-zookeeper-bogon.out 日志:

java.net.BindException: 地址已在使用at sun.nio.ch.Net.bind0(Native Method)at sun.nio.ch.Net.bind(Net.java:444)at sun.nio.ch.Net.bind(Net.java:436)at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:225)at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)at org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:111)at org.apache.hadoop.hbase.zookeeper.HQuorumPeer.runZKServer(HQuorumPeer.java:94)at org.apache.hadoop.hbase.zookeeper.HQuorumPeer.main(HQuorumPeer.java:79)

其实是因为 hbase 启动时也会启动 zookeeper,但是在此之前我已经启动了 zookeeper,导致地址被占用。

解决方法

  • 停止 zookeeper 服务,启动 hbase 时自动启动 zookeeper 服务
  • 设置 hbase-env.sh 配置文件,将 HBASE_MANAGES_ZK 设置为 false,禁用 zookeeper 启动,通过手动启动
[root@bogon conf]# vim hbase-env.sh 
export JAVA_HOME=/home/hj/app/jdk1.8.0_261
export HBASE_CLASSPATH=/home/hj/app/hadoop-2.6.0-cdh5.7.0/conf
export HBASE_MANAGES_ZK=false

4.1.2 启动时报:regionserver running as process 16996. Stop it first.

具体错误详情:

[root@bogon bin]# sh start-hbase.sh
starting master, logging to /home/hj/app/hbase-1.1.2/logs/hbase-root-master-bogon.out
regionserver running as process 16996. Stop it first.

解决方法

killregionserver,再启动 hbasekill -9 16996


4.1.3 启动报 SLF4J: Class path contains multiple SLF4J bindings

原因:hbasehadoopjar 包名冲突,修改 hbase 的 jar 包名:

mv slf4j-logj12-1.7.25.jar slf4j-logj12-1.7.25.jar-copy

参考文章:https://blog.csdn.net/qq_45135120/article/details/107048944

4.1.4 启动报 Java HotSpot™ 64-Bit Server VM warning: ignoring option PermSize=128m;

参考文章:https://blog.csdn.net/tiankong_12345/article/details/93585463

4.1.5 hbase shell 操作时报 Can’t get master address from ZooKeeper; znode data == null

问题:HMaster 进程掉了,没起来:

解决办法:http://blog.51yip.com/hadoop/2193.html

hbase 操作

[root@bogon hbase-1.1.2]# ./bin/hbase shell
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hj/app/hbase-1.1.2/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hj/app/hadoop-2.6.0-cdh5.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2021-01-17 11:53:03,530 WARN  [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
HBase Shell; enter 'help' for list of supported commands.
Type "exit" to leave the HBase Shell
Version 1.1.2, rcc2b70cf03e3378800661ec5cab11eb43fafe0fc, Wed Aug 26 20:11:27 PDT 2015
[root@bogon bin]# ./hbase shell
2021-02-06 21:54:05,371 WARN  [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
HBase Shell; enter 'help' for list of supported commands.
Type "exit" to leave the HBase Shell
Version 1.1.2, rcc2b70cf03e3378800661ec5cab11eb43fafe0fc, Wed Aug 26 20:11:27 PDT 2015hbase(main):001:0> list
TABLE                                                                                                                                
0 row(s) in 0.7260 seconds=> []hbase(main):005:0> create 'student', 'info'
0 row(s) in 2.4760 seconds=> Hbase::Table - student
hbase(main):006:0> describe 'student'
Table student is ENABLED                                                                                            
student                                                                                                             
COLUMN FAMILIES DESCRIPTION                                                                                         
{NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BL
OCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZ
E => '65536', REPLICATION_SCOPE => '0'}                                                                             
1 row(s) in 0.3350 secondshbase(main):002:0> put 'student','1','info:name','Xueqian'
0 row(s) in 0.5210 secondshbase(main):003:0> put 'student','1','info:gender','F'
0 row(s) in 0.0340 secondshbase(main):004:0> put 'student','1','info:age','23'
0 row(s) in 0.0490 secondshbase(main):005:0> put 'student','2','info:name','Weiliang'
0 row(s) in 0.0480 secondshbase(main):006:0> put 'student','2','info:gender','M'
0 row(s) in 0.0570 secondshbase(main):007:0> put 'student','2','info:age','24'
0 row(s) in 0.0270 secondshbase(main):008:0> get 'student', '1'
COLUMN                             CELL                                                                                              info:age                          timestamp=1612620059368, value=23                                                                 info:gender                       timestamp=1612620052093, value=F                                                                  info:name                         timestamp=1612620042398, value=Xueqian                                                            
3 row(s) in 0.1330 secondshbase(main):010:0> scan 'student'
ROW                                COLUMN+CELL                                                                                       1                                 column=info:age, timestamp=1612620059368, value=23                                                1                                 column=info:gender, timestamp=1612620052093, value=F                                              1                                 column=info:name, timestamp=1612620042398, value=Xueqian                                          2                                 column=info:age, timestamp=1612620095417, value=24                                                2                                 column=info:gender, timestamp=1612620086286, value=M                                              2                                 column=info:name, timestamp=1612620076564, value=Weiliang                                         
2 row(s) in 0.0870 seconds
export SPARK_DIST_CLASSPATH=$(/home/hj/app/hadoop-2.6.0-cdh5.7.0/bin/hadoop classpath):$(/home/hj/app/hbase-1.1.2/bin/hbase classpath):/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/jars/hbase/*

4.2 pyspark 读写 hbase

4.2.1 配置 spark

spark 想要读写 hbase,需要将 hbase 中相关jar 包拷贝到 spark/jars 中,需要拷贝的 jar 包有:所有hbase开头的jar文件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar

1、spark 安装目录 jars 中,新建 hbase 目录

[root@bogon spark-2.2.0-bin-2.6.0-cdh5.7.0]# cd jars
[root@bogon jars]# pwd
/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/jarsmkdir hbase
cp /home/hj/app/hbase-1.1.2/lib/hbase*.jar hbase/
cp /home/hj/app/hbase-1.1.2/lib/guava-12.0.1.jar  hbase/
cp /home/hj/app/hbase-1.1.2/lib/htrace-core-3.1.0-incubating.jar  hbase/
cp /home/hj/app/hbase-1.1.2/lib/protobuf-java-2.5.0.jar  hbase/

2、Spark 2.0 版本上缺少相关把 hbase 的数据转换 python 可读取的 jar 包,需要另行下载 spark-example-1.6.0.jar,下载后再将其包括到 hbase/ 中:

mv ~/下载/spark-examples* hbase/

4.2.2 读写 hbase

1、打开 pyspark shell 终端:

# 切换到 spark/bin 目录,启动 pyspark shell 终端
[root@bogon spark-2.2.0-bin-2.6.0-cdh5.7.0]# cd bin/
[root@bogon bin]# pwd
/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/bin
[root@bogon bin]# ls
beeline          load-spark-env.cmd  pyspark2.cmd     spark-class       sparkR2.cmd       spark-shell.cmd    spark-submit.cmd
beeline.cmd      load-spark-env.sh   pyspark.cmd      spark-class2.cmd  sparkR.cmd        spark-sql
derby.log        metastore_db        run-example      spark-class.cmd   spark-shell       spark-submit
find-spark-home  pyspark             run-example.cmd  sparkR            spark-shell2.cmd  spark-submit2.cmd
[root@bogon bin]# ./pyspark

2、读取之前 hbase 中数据

hbase(main):002:0> scan 'student'
ROW                                COLUMN+CELL                                                                                       1                                 column=info:age, timestamp=1612620059368, value=23                                                1                                 column=info:gender, timestamp=1612620052093, value=F                                              1                                 column=info:name, timestamp=1612620042398, value=Xueqian                                          2                                 column=info:age, timestamp=1612620095417, value=24                                                2                                 column=info:gender, timestamp=1612620086286, value=M                                              2                                 column=info:name, timestamp=1612620076564, value=Weiliang                                         
2 row(s) in 3.3040 seconds

3、读取:

>>> host = 'localhost'
>>> table = 'student'
>>> conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
>>> keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
>>> valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
>>> hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
>>> count = hbase_rdd.count()
>>> hbase_rdd.cache()
MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:208
>>> output = hbase_rdd.collect()
>>> for (k, v) in output:
...     print(k, v)
... 
(u'1', u'{"qualifier" : "age", "timestamp" : "1612620059368", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "23"}\n{"qualifier" : "gender", "timestamp" : "1612620052093", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "F"}\n{"qualifier" : "name", "timestamp" : "1612620042398", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "Xueqian"}')
(u'2', u'{"qualifier" : "age", "timestamp" : "1612620095417", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "24"}\n{"qualifier" : "gender", "timestamp" : "1612620086286", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "M"}\n{"qualifier" : "name", "timestamp" : "1612620076564", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "Weiliang"}')
>>> count
2

4、写入数据:

>>> k1_conv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
>>> v1_conv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
>>> conf_1 = {"hbase.zookeeper.quorum": host,"hbase.mapred.outputtable": table,"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
>>> rawData = ['3,info,name,Rongcheng','4,info,name,Guanhua']
>>> sc.parallelize(rawData).map(lambda x: (x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf_1,keyConverter=k1_conv,valueConverter=v1_conv)
21/02/09 22:31:50 ERROR io.SparkHadoopMapReduceWriter: Aborting job job_20210209223147_0008.
java.lang.IllegalArgumentException: Can not create a Path from a null stringat org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)at org.apache.hadoop.fs.Path.(Path.java:135)at org.apache.hadoop.fs.Path.(Path.java:89)at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:132)at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:101)at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)at org.apache.spark.api.python.PythonRDD$.saveAsHadoopDataset(PythonRDD.scala:861)at org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset(PythonRDD.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at py4j.Gateway.invoke(Gateway.java:280)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:214)at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):File "", line 1, in File "/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/python/pyspark/rdd.py", line 1393, in saveAsNewAPIHadoopDatasetkeyConverter, valueConverter, True)File "/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__File "/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/python/pyspark/sql/utils.py", line 79, in decoraise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: u'Can not create a Path from a null string'

查看是否写入成功:

# 发现多了两条新数据,写入成功
hbase(main):003:0> scan 'student'
ROW                                COLUMN+CELL                                                                                       1                                 column=info:age, timestamp=1612620059368, value=23                                                1                                 column=info:gender, timestamp=1612620052093, value=F                                              1                                 column=info:name, timestamp=1612620042398, value=Xueqian                                          2                                 column=info:age, timestamp=1612620095417, value=24                                                2                                 column=info:gender, timestamp=1612620086286, value=M                                              2                                 column=info:name, timestamp=1612620076564, value=Weiliang                                         3                                 column=info:name, timestamp=1612881109880, value=Rongcheng                                        4                                 column=info:name, timestamp=1612881109879, value=Guanhua                                          
4 row(s) in 0.1310 seconds

注意:写入数据会报错 pyspark.sql.utils.IllegalArgumentException: u'Can not create a Path from a null string',但是仍会写入成功!

相关内容

热门资讯

安卓子系统windows11,... 你知道吗?最近科技圈可是炸开了锅,因为安卓子系统在Windows 11上的兼容性成了大家热议的话题。...
电脑里怎么下载安卓系统,电脑端... 你有没有想过,你的电脑里也能装上安卓系统呢?没错,就是那个让你手机不离手的安卓!今天,就让我来带你一...
索尼相机魔改安卓系统,魔改系统... 你知道吗?最近在摄影圈里掀起了一股热潮,那就是索尼相机魔改安卓系统。这可不是一般的改装,而是让这些专...
安卓系统哪家的最流畅,安卓系统... 你有没有想过,为什么你的手机有时候像蜗牛一样慢吞吞的,而别人的手机却能像风一样快?这背后,其实就是安...
安卓最新系统4.42,深度解析... 你有没有发现,你的安卓手机最近是不是有点儿不一样了?没错,就是那个一直在默默更新的安卓最新系统4.4...
android和安卓什么系统最... 你有没有想过,你的安卓手机到底是用的是什么系统呢?是不是有时候觉得手机卡顿,运行缓慢,其实跟这个系统...
平板装安卓xp系统好,探索复古... 你有没有想过,把安卓系统装到平板上,再配上XP系统,这会是怎样一番景象呢?想象一边享受着安卓的便捷,...
投影仪装安卓系统,开启智能投影... 你有没有想过,家里的老式投影仪也能焕发第二春呢?没错,就是那个曾经陪你熬夜看电影的“老伙计”,现在它...
安卓系统无线车载carplay... 你有没有想过,开车的时候也能享受到苹果设备的便利呢?没错,就是那个让你在日常生活中离不开的iOS系统...
谷歌安卓8系统包,系统包解析与... 你有没有发现,手机更新换代的速度简直就像坐上了火箭呢?这不,最近谷歌又发布了安卓8系统包,听说这个新...
微软平板下软件安卓系统,开启全... 你有没有想过,在微软平板上也能畅享安卓系统的乐趣呢?没错,这就是今天我要跟你分享的神奇故事。想象你手...
coloros是基于安卓系统吗... 你有没有想过,手机里的那个色彩斑斓的界面,背后其实有着一个有趣的故事呢?没错,我要说的就是Color...
安卓神盾系统应用市场,一站式智... 你有没有发现,手机里的安卓神盾系统应用市场最近可是火得一塌糊涂啊!这不,我就来给你好好扒一扒,看看这...
黑莓平板安卓系统升级,解锁无限... 亲爱的读者们,你是否还记得那个曾经风靡一时的黑莓手机?那个标志性的全键盘,那个独特的黑莓体验,如今它...
安卓文件系统采用华为,探索高效... 你知道吗?最近安卓系统在文件管理上可是有了大动作呢!华为这个科技巨头,竟然悄悄地给安卓文件系统来了个...
深度系统能用安卓app,探索智... 你知道吗?现在科技的发展真是让人惊叹不已!今天,我要给你揭秘一个超级酷炫的话题——深度系统能用安卓a...
安卓系统的分区类型,深度解析存... 你有没有发现,你的安卓手机里藏着不少秘密?没错,就是那些神秘的分区类型。今天,就让我带你一探究竟,揭...
安卓系统铠无法兑换,揭秘无法兑... 最近是不是有很多小伙伴在玩安卓系统的游戏,突然发现了一个让人头疼的问题——铠无法兑换!别急,今天就来...
汽车安卓系统崩溃怎么刷,一键刷... 亲爱的车主朋友们,你是否曾遇到过汽车安卓系统崩溃的尴尬时刻?手机系统崩溃还能重启,但汽车系统崩溃了,...
miui系统可以刷安卓p系统吗... 亲爱的手机控们,你是否对MIUI系统情有独钟,同时又对安卓P系统的新鲜功能垂涎欲滴?今天,就让我带你...