明夷待访录(四):Spark SQL

Spark SQL 简介

从 Shark 说起

Shark 即 Hive on Spark

Spark SQL 设计

从 HQL 被解析成抽象语法树(AST)起,就全部由 Spark SQL 接管了

Spark-SQL 架构
Spark-SQL 架构

Spark SQL 增加了 SchemaRDD(即带有 Schema 信息的 RDD),使用户可以在 Spark SQL 中执行 SQL 语句,数据既可以来自 RDD,也可以来自 Hive、HDFS、Cassandra 等外部数据源,还可以是 JSON 格式的数据。

Spark-SQL 支持的数据格式和编程语言
Spark-SQL 支持的数据格式和编程语言

DataFrame 与 RDD 的区别

DataFrame 与 RDD 的区别
DataFrame 与 RDD 的区别

DataFrame 的推出,让 Spark 具备了处理大规模结构化数据的能力

RDD 是分布式的 Java 对象的集合,比如,RDD[Person]是以 Person 为类型参数,但是,Person 类的内部结构对于 RDD 而言却是不可知的。

DataFrame 是一种以 RDD 为基础的分布式数据集,也就是分布式的 Row 对象的集合(每个 Row 对象代表一行记录),提供了详细的结构信息,也就是我们经常说的模式(schema),Spark SQL 可以清楚地知道该数据集中包含哪些列、每列的名称和类型。

DataFrame 的创建

Spark 使用全新的 SparkSession 接口替代 Spark1.6 中的 SQLContextHiveContext接口来实现其对数据加载、转换、处理等功能。

SparkSession支持从不同的数据源加载数据,并把数据转换成 DataFrame,并且支持把DataFrame 转换成SQLContext自身中的表,然后使用 SQL 语句来操作数据。SparkSession 亦提供了 HiveQL 以及其他依赖于 Hive 的功能的支持。

如何从 people.json 文件中读取数据并生成 DataFrame 并显示数据?

1
from pyspark.sql import SparkSession
1
spark = SparkSession.builder.getOrCreate()
1
df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")

看一下 Json 里面有什么

1
df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

打印模式信息

1
df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

选择多列

1
df.select(df.name, df.age + 1).show()
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

条件过滤

1
df.filter(df.age > 20).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

分组聚合

1
df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

排序

1
df.sort(df.age.desc()).show()
+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|  19| Justin|
|null|Michael|
+----+-------+

多列排序

1
df.sort(df.age.desc(), df.name.asc()).show()
+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|  19| Justin|
|null|Michael|
+----+-------+

对列进行重命名

1
df.select(df.name.alias("username"), df.age).show()
+--------+----+
|username| age|
+--------+----+
| Michael|null|
|    Andy|  30|
|  Justin|  19|
+--------+----+

从 RDD 转换得到 DataFrame

第一种方法是,利用反射来推断包含特定类型对象的 RDD 的 schema,适用对已知数据结构的 RDD 转换;第二种方法是,使用编程接口,构造一个 schema 并将其应用在已知的 RDD 上。

利用反射机制推断 RDD 模式

在利用反射机制推断 RDD 模式时,我们会用到 toDF() 方法

反射机制,在我这里的理解很简单暴力:

  • 通过字符串创建相应的对象
  • 利用字符串的动态性,动态地创建需要的对象
1
2
from pyspark.sql.types import Row
from pyspark import SparkContext
1
sc = SparkContext("local", "test")
1
2
3
4
5
6
def f(x):
rel = {}
rel['name'] = x[0]
rel['age'] = x[1]

return rel
1
2
3
peopleDF = sc.textFile(
"file:///usr/local/spark/examples/src/main/resources/people.txt").map(
lambda line: line.split(',')).map(lambda x: Row(**f(x)).toDF())
1
peopleDF.createOrReplaceTempView("people")
---------------------------------------------------------------------------

AttributeError                            Traceback (most recent call last)

<ipython-input-9-725c89f5f957> in <module>
----> 1 peopleDF.createOrReplaceTempView("people")


AttributeError: 'PipelinedRDD' object has no attribute 'createOrReplaceTempView'
1
peopleDF = spark.sql("select * from people")
1
peopleDF.rdd.map(lambda t: "Name:" + t[0] + "," + "Age:" + t[1]).take(10)
['Name:  29, Age: Michael', 'Name:  30, Age: Andy', 'Name:  19, Age: Justin']

使用编程方式定义 RDD 模式

1
from pyspark.sql.types import StructType, StructField, StructType, StringType, Row

生成 RDD

1
2
peopleRDD = sc.textFile(
"file:///usr/local/spark/examples/src/main/resources/people.txt")

定义一个模式字符串

1
schemaString = "name age"

根据模式字符串生成模式

1
2
3
fields = list(
map(lambda fieldName: StructField(fieldName, StringType(), nullable=True),
schemaString.split(" ")))
1
schema = StructType(fields)
1
schema
StructType(List(StructField(name,StringType,true),StructField(age,StringType,true)))
1
2
rowRDD = peopleRDD.map(lambda line: line.split(',')).map(
lambda attributes: Row(attributes[0], attributes[1]))
1
peopleDF = spark.createDataFrame(rowRDD, schema)
1
peopleDF.createOrReplaceTempView("people")
1
2
results.rdd.map(lambda attributes: "name:" + attributes[0] + "," + "age:" +
attributes[1]).foreach(print)
1
results.show()
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

把 RDD 保存成文件

1
2
peopleDF = spark.read.format("json").load(
"file:///usr/local/spark/examples/src/main/resources/people.json")
1
2
peopleDF.select("name", "age").write.format("csv").save(
"file:///home/hadoop/spark/data/newpeople.csv")

另一种保存办法

1
peopleDF.rdd.saveAsTextFile("file:///home/hadoop/spark/data/newpeople.txt")

读取和保存数据

读写 Parquet(DataFrame)

如何从 parquet 文件中加载数据生成 DataFrame

这里需要把 java 版本降到 1.8,然并卵

如何将 DataFrame 保存成 parquet 文件?

1
2
peopleDF = spark.read.json(
"file:///usr/local/spark/examples/src/main/resources/people.json")
1
peopleDF.write.parquet("file:///home/hadoop/spark/data/newpeople.parquet")

通过 JDBC 连接数据库(DateFrame)

连接 Hive 读写数据