RDD 编程
RDD 创建
创建办法
- 读取外部数据
- 本地文件
- HDFS
- HBase
- Cassandra 等
- 调用
SparkContext
的parallelize
方法,在 Driver 中一个已经存在的集合(数组)上创建
创建 RDD 之前的准备工作
启动 HDFS 组件
1 | /usr/local/hadoop/sbin/start-dfs.sh |
创建 rdd 子目录存放代码和相关文件
1 | !mkdir rdd && cd rdd |
在 rdd 目录下新建一个 word.txt 文件,随便输入什么内容。这里我直接放的就是 man 手册
从文件系统中加载数据创建 RDD
SparkContext
创建上下文对象
1 | from pyspark import SparkContext |
三条命令都等价
textFile()
从文件系统中加载数据创建rdd
,参数是 URI - 本地文件系统地址 - 分布式文件系统 HDFS 地址
这里我们不用上面创建的 word.txt,而是直接用 README.md,保证读者和文章输出一致。
textFile()
的第二个参数用来指定分区数目,默认是最小值 128MB。
1 | lines = sc.textFile("hdfs://localhost:9000/user/hadoop/README.md") |
通过并行集合(数组)创建 RDD
可以调用 SparkContext 的 parallelize 方法,在 Driver 中一个已经存在的集合(数组)上创建。
1 | nums = [1, 2, 3, 4, 5] |
RDD 操作
转换
转换过程只是记录了转换的轨迹,并不会发生真正的计算
这里要注意,他们返回的是什么东西
filter(func)
:筛选出满足函数func
的元素,并返回一个新的数据集map(func)
:将每个元素传递到函数func
中,并将结果返回为一个新的数据集flatMap(func)
:与map()
相似,但每个输入元素都可以映射到 0 或多个输出结果groupByKey()
:应用于(K,V)
键值对的数据集时,返回一个新的(K, Iterable)
形式的数据集reduceByKey(func)
:应用于(K,V)
键值对的数据集时,返回一个新的(K, V)
形式的数据集,其中的每个值是将每个key
传递到函数func
中进行聚合fiter()
返回的是类似于list
,list
里面是满足条件的数据,这个list
相对于原始的list
变短了map()
返回的是list_transform
,它是list
的变形,和list
一一对应flatMap()
返回list_transform
,但是不是一一对应,可以 一对多 或者 一对零groupByKey()
收集所有K
相同的V
,返回的数据类似于defaultdict(list)
reduceByKey()
将K
分组再进行 reduce,\(v_1 \text{ op} v_2 \to v_1\)
行动
行动操作是真正触发计算的地方
take
比
count()
返回数据集中的元素个数collect()
以数组的形式返回数据集中的所有元素,返回一个 数组first()
返回数据集中的第一个元素take(n)
以数组的形式返回数据集中的前 n 个元素reduce(func)
通过函数 func(输入两个参数并返回一个值)聚合数据集中的元素foreach(func)
将数据集中的每个元素传递到函数 func 中运行
注意
map
是转换,不会实际操作
reduce
是动作,它是来真的
惰性机制
lines
读取
textFile()
只是一个转换操作,并不会真的去读文件
1 | lineLength = lines.map(lambda s: len(s)) # 计算每行的长度(即每行包含多少个单词) |
1 | totalLength = lineLength.reduce(lambda a, b: a + b) |
1 | totalLength |
3847
Spark 会把计算分解成多个任务在不同的机器上执行,每台机器运行位于属于它自己的 map 和 reduce,最后把结果返回给 Driver Program。
实例
计算结果集中满足条件的元素个数
将当前遍历到的行赋值给参数 line
,然后对每行文本执行 lamda 表达式,满足条件的line
被放入结果集中。最后执行 count
。
1 | lines.filter(lambda line: "Spark" in line).count() |
20
找出文本文件中 单行文本所包含的单词数量 的最大值
lambda line: len(line.split(" "))
将每行文本传递给 lambda,计算出每行文本的单词数,得到是一个 rdd,每个元素都是整数lambda a, b: (a > b and a or b)
每次接收两个参数,留下较大者。这里是个 trick
1 | lines.map(lambda line: len(line.split(""))).reduce(lambda a, b: (a > b and a |
22
自己写的,看单词的最大长度
1 | lines.flatMap(lambda line: line.split("")).map(lambda word: len(word)).reduce( |
111
持久化
两次操作触发了两次从头到尾的计算
1 | list = ["Hadoop", "Spark", "Hive"] |
3
1 | print(','.join(rdd.collect())) |
Hadoop,Spark,Hive
persist()
方法对一个 RDD 标记为持久化,之所以说“标记为持久化”,是因为出现 persist()
语句的地方,并不会马上计算生成 RDD 并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化,持久化后的 RDD 将会被保留在计算节点的内存中被后面的行动操作重复使用。
1 | list = ["Hadoop", "Spark", "Hive"] |
1 | rdd.cache() # 会调用 persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存 rdd,这时 rdd 还没有被计算生成 |
ParallelCollectionRDD[9] at parallelize at PythonRDD.scala:195
一般而言,使用 cache()
方法时,会调用persist(MEMORY_ONLY)
。
1 | print(rdd.count()) # 这时才会执行上面的 rdd.cache(),把这个 rdd 放到缓存中 |
3
1 | print(','.join(rdd.collect())) # 不需要触发从头到尾的计算,只需要重复使用上面缓存中的 rdd |
Hadoop, Spark, Hive
可以使用 unpersist()
方法手动地把持久化的 RDD 从缓存中移除。
分区
RDD 是弹性分布式数据集,通常 RDD 很大,会被分成很多个分区,分别保存在不同的节点上。
RDD 分区的一个分区原则是使得分区的个数尽量等于集群中的 CPU 核心(core)数目。
spark.default.parallelism
配置默认的分区数
Spark 的四种部署模式
- 本地模式 log[N]
- Standalone 模式,集群中所有 CPU 核心数目总和,但不小于 2
- YARN 模式,集群中所有 CPU 核心数目总和,但不小于 2
- Mesos 模型,默认为 8
1 | array = [1, 2, 3, 4, 5] |
1 | rdd = sc.parallelize(array, 2) # 设置两个分区 |
打印元素
本地 rdd.foreach(print)
或者rdd.map(print)
多机 rdd.collect().foreach(print)
或rdd.take(100).foreach(print)
存在问题
1 | rdd.foreach(print) # 无法使用 |
键值对 RDD
键值对 RDD 的创建 pairrdd
第一种创建方式:从文件中加载
1 | # lines = sc.textFile(path) |
/user/hadoop/README.md MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0
1 | pairRDD = lines.flatMap(lambda line: line.split(""))\ |
1 | pairRDD.take(10) |
[('#', 1),
('Apache', 1),
('Spark', 1),
('', 1),
('Spark', 1),
('is', 1),
('a', 1),
('fast', 1),
('and', 1),
('general', 1)]
第二种创建方式:通过并行集合(列表)创建 RDD
1 | list = ["Hadoop", "Spark", "Hive", "Spark"] |
1 | pairRDD = rdd.map(lambda word: (word, 1)) |
1 | pairRDD.take(10) |
[('Hadoop', 1), ('Spark', 1), ('Hive', 1), ('Spark', 1)]
常用的键值对转换操作
reduceByKey(func)
使用 func
函数合并具有相同键的值
(a,b) => a+b
这个 Lamda 表达式中,a 和 b 都是指 value
1 | pairRDD.reduceByKey(lambda a, b: a + b).take(10) |
[('Hadoop', 1), ('Spark', 2), ('Hive', 1)]
groupByKey()的功能是,对具有相同键的值进行 group
1 | pairRDD.groupByKey().take(10) |
[('Hadoop', <pyspark.resultiterable.ResultIterable at 0x7faaea1ba470>),
('Spark', <pyspark.resultiterable.ResultIterable at 0x7faaea1ba400>),
('Hive', <pyspark.resultiterable.ResultIterable at 0x7faaea1ba5c0>)]
keys()
只会把键值对 RDD 中的 key 返回形成一个新的 RDD
采用 keys()
后得到的结果是一个RDD[Int]
,内容是{"spark","spark","hadoop","hadoop"}
1 | pairRDD.keys().take(10) |
['Hadoop', 'Spark', 'Hive', 'Spark']
values()
values()只会把键值对 RDD 中的 value 返回形成一个新的 RDD
采用 values()
后得到的结果是一个RDD[Int]
,内容是{1,2,3,5}
1 | pairRDD.values().take(10) |
[1, 1, 1, 1]
sortByKey()
的功能是返回一个根据键排序的 RDD
1 | pairRDD.sortByKey().take(10) |
[('Hadoop', 1), ('Hive', 1), ('Spark', 1), ('Spark', 1)]
mapValues(func)
我们只想对键值对 RDD 的 value 部分进行处理,而不是同时对 key 和 value 进行处理
对键值对 RDD 中的每个 value 都应用一个函数
1 | pairRDD.mapValues(lambda x: x + 1).take(10) |
[('Hadoop', 2), ('Spark', 2), ('Hive', 2), ('Spark', 2)]
join
表示内连接
对于内连接,对于给定的两个输入数据集 (K,V1)
和(K,V2)
,只有在两个数据集中都存在的 key 才会被输出,最终得到一个 (K,(V1,V2))
类型的数据集
1 | pairRDD1 = sc.parallelize([('spark', 1), ('spark', 2), ('hadoop', 3), |
1 | pairRDD1.join(pairRDD2).take(10) |
[('spark', (1, 'fast')), ('spark', (2, 'fast'))]
1 | pairRDD2.join(pairRDD1).take(10) |
[('spark', ('fast', 1)), ('spark', ('fast', 2))]
一个综合实例
题目:给定一组键值对("spark",2),("hadoop",6),("hadoop",4),("spark",6)
键值对的 key 表示图书名称,value 表示某天图书销量
请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
1 | rdd = sc.parallelize([("spark", 2), ("hadoop", 6), ("hadoop", 4), |
1 | rdd.mapValues(lambda x: (x, 1)) \ |
[('spark', 4.0), ('hadoop', 5.0)]
mapVlaues(x, 1)
打散数据(金额,数量)reduceByKeys
分桶计算总金额和总数量,这里没有 key 的事情了mapValues
除法得到平均金额
键值对 ("spark",2) 经过 mapValues()函数处理后,就变成了("spark",(2,1)),其中,数值 1 表示 "spark" 这个键的 1 次出现
x 和 y 都是 value,而且是具有相同 key 的两个键值对所对应的 value,比如,在这个例子中, ("hadoop",(6,1))
和 ("hadoop",(4,1))
这两个键值对具有相同的 key,所以,对于函数中的输入参数 (x,y) 而言,x 就是 (6,1),序列从 0 开始计算,x[0]
表示这个键值对中的第 1 个元素 6,x[1]
表示这个键值对中的第二个元素 1,y 就是 (4,1),y[0]
表示这个键值对中的第 1 个元素 4,y[1]
表示这个键值对中的第二个元素 1,所以,函数体 (x[0]+y[0],x[1] + y[2])
,相当于生成一个新的键值对(key,value)
,其中,key 是x[0]+y[0]
,也就是 6+4=10,value 是x[1] + y[1]
,也就是 1+1=2,因此,函数体(x[0]+y[0],x[1] + y[1])
执行后得到的 value 是(10,2)
共享变量
需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量
广播变量用来把变量在所有节点的内存之间进行共享。累加器则支持在所有不同节点之间进行累加计算(比如计数或者求和)。
广播变量
Spark 的 Action 操作会跨越多个阶段(stage),对于每个阶段内的所有任务所需要的公共数据,Spark 都会自动进行广播。通过广播方式进行传播的变量,会经过序列化,然后在被任务使用时再进行反序列化。这就意味着,显式地创建广播变量只有在下面的情形中是有用的:当跨越多个阶段的那些任务需要相同的数据,或者当以反序列化方式对数据进行缓存是非常重要的。
通过调用 SparkContext.broadcast(v)
来从一个普通变量 v
中创建一个广播变量。这个广播变量就是对普通变量 v
的一个包装器,通过调用 value
方法就可以获得这个广播变量的值
1 | broadcastVar = sc.broadcast([1, 2, 3]) |
1 | broadcastVar.value |
[1, 2, 3]
一旦广播变量创建后,普通变量 v 的值就不能再发生修改,从而确保所有节点都获得这个广播变量的相同的值。
累加器
用来实现计数器(counter)和求和(sum)
通过调用 SparkContext.accumulator()
来创建
运行在集群中的任务,就可以使用 add
方法来把数值累加到累加器上,但是,这些任务只能做累加操作,不能读取累加器的值,只有任务控制节点(Driver Program)可以使用 value 方法来读取累加器的值。
1 | accum = sc.accumulator(0) |
1 | sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) |
10
数据读写
文件系统的数据读写
介绍文件系统的读写和 HDFS 的读写
本地文件系统的数据读写
1 | textFile = sc.textFile("file:///usr/local/spark/README.md") |
1 | textFile.first() |
'# Apache Spark'
1 | textFile.saveAsTextFile("file:///home/hadoop/spark/data/wordcount/writeback.txt") |
1 | !ls /home/hadoop/spark/data/wordcount/writeback.txt/ |
part-00000 _SUCCESS
分布式文件系统 HDFS 的数据读写
查看 HDFS 文件系统根目录下的内容
这里的几条命令很重要
1 | !/usr/local/hadoop/bin/hdfs dfs -ls / |
Found 1 items
drwxr-xr-x - hadoop supergroup 0 2019-06-18 15:48 /user
1 | ./bin/hdfs dfs -ls /user/hadoop |
1 | textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/README.md") |
1 | textFile.first() |
'# Apache Spark'
1 | >>> val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") |
1 | >>> val textFile = sc.textFile("word.txt") |
不同文件格式的读写
文本文件
sc.textFile
rdd.saveAsTextFile
JSON
1 | jsonStr = sc.textFile( |
1 | jsonStr.take(10) |
['{"name":"Michael"}',
'{"name":"Andy", "age":30}',
'{"name":"Justin", "age":19}']
1 | import json |
1 | result = jsonStr.map(lambda s: json.loads(s)) |
1 | result.take(10) |
[{'name': 'Michael'},
{'name': 'Andy', 'age': 30},
{'name': 'Justin', 'age': 19}]
文件数据读写
读写 HBase 数据
HBase 是针对谷歌 BigTable 的开源实现,是一个高可靠、高性能、面向列、可伸缩的分布式数据库,主要用来存储非结构化和半结构化的松散数据。HBase 可以支持超大规模数据存储,它可以通过水平扩展的方式,利用廉价计算机集群处理由超过 10 亿行数据和数百万列元素组成的数据表。