SparkSQL数据源与数据存储
1. 大数据分析流程
- 在互联网产业中大数据生态体系的主要作用就是存储、处理海量数据为企业创造价值、推动社会进步,数据分析流程存在三个主要流程:
- 计算系统可以加载外部数据源
- 资源系统可以为计算系统分配运行资源
- 计算系统数据分析最终结果可以持久化到外部系统
- 通过图片可以得知存储系统才是大数据计算体系中的基石,学习一个计算框架应该先从如何使用当前计算框架加载外部数据源开始。
2. Spark SQL数据源
- Spark SQL 是 Apache Spark 的模块之一,提供对结构化数据的查询能力。它支持多种数据源,包括 HDFS、S3、Hive、Parquet、JSON 等,允许用户通过 SQL 语句或 DataFrame API 访问和处理数据。Spark SQL 的优化器可以自动优化查询计划,提高执行效率。此外,它还支持外部数据源的集成,使得在不同存储系统间进行数据交换和分析变得简单快捷。
2.1 SparkSQL常见数据源
- Hive 数据仓库
- MySQL 关系型数据库
- FileSystem 文件系统:本地文件系统、分布式文件系统
- 由 RDD 生成 SparkSQL 数据源
2.2 SparkSQL支持的文本格式
数据格式 | 描述 |
---|---|
csv | CSV(字段与字段之间的分隔符为逗号) |
json | JSON(是一种轻量级的数据交换格式,采用完全独立于编程语言的文本格式来存储和表示数据。 简洁和清晰的层次结构、易于人阅读和编写,同时也易于机器解析和生成) |
text | Text(文本数据,字段与字段之间的分隔符没有限制) |
parquet | Parquet(Parquet是一种面向列存储的文件格式,主要用于Hadoop生态系统。 对数据处理框架、数据模型和编程语言无关) |
2.3 加载外部数据源步骤
- 创建 SparkSession 实例对象
- 通过 SparkSession 实例对象提供的方法加载外部数据
3. 本地文件系统加载数据
3.1 本地文件系统加载JSON格式数据
3.1.1 概述
- JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,易于人阅读和编写,也易于机器解析和生成。在本地文件系统中加载JSON格式数据时,可以使用
DataFrameReader
的json()
方法或通过format("json")
指定格式。
3.1.2 案例演示
- 在项目根目录创建
data
目录
- 在
data
里创建users.json
文件
{"name": "李小玲", "gender": "女", "age": 45}
{"name": "童安格", "gender": "男", "age": 26}
{"name": "陈燕文", "gender": "女", "age": 18}
{"name": "王晓明", "gender": "男", "age": 32}
{"name": "张丽华", "gender": "女", "age": 29}
{"name": "刘伟强", "gender": "男", "age": 40}
{"name": "赵静怡", "gender": "女", "age": 22}
{"name": "孙强东", "gender": "男", "age": 35}
- 在
net.huawei.sql
包里创建LoadJSON
对象
package net.huawei.sql
import org.apache.spark.sql.SparkSession
/**
* 功能:加载JSON数据
* 作者:华卫
* 日期:2025年01月17日
*/
object LoadJSON {
def main(args: Array[String]): Unit = {
// 获取或创建Spark会话对象
val spark = SparkSession.builder() // 创建Builder对象
.appName("LoadJSON") // 设置应用程序名称
.master("local[*]") // 运行模式:本地运行
.getOrCreate() // 获取或创建Spark会话对象
// 使用json()方法加载本地JSON文件
val df_json = spark.read.json("data/users.json")
// 显示数据
df_json.show()
// 关闭会话对象
spark.stop()
}
}
- 运行程序,查看结果
3.2 本地文件系统加载CSV格式数据
3.2.1 概述
- CSV(Comma-Separated Values)是一种常用的表格数据存储格式,数据以纯文本形式存储,字段间用逗号分隔。加载CSV格式数据时,可以使用
DataFrameReader
的csv()
方法或通过format("csv")
指定格式。
3.2.2 案例演示
- 在
data
里创建users.csv
文件
name,gender,age
李小玲,女,45
童安格,男,26
陈燕文,女,18
王晓明,男,32
张丽华,女,29
刘伟强,男,40
赵静怡,女,22
孙强东,男,35
- 在
net.huawei.sql
包里创建loadCSV
对象
package net.huawei.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.csv.MultiLineCSVDataSource.inferSchema
import org.json4s.scalap.scalasig.ClassFileParser.header
/**
* 功能:加载CSV数据
* 作者:华卫
* 日期:2025年01月17日
*/
object LoadCSV {
def main(args: Array[String]): Unit = {
// 获取或创建Spark会话对象
val spark = SparkSession.builder() // 创建Builder对象
.appName("LoadCSV") // 设置应用程序名称
.master("local[*]") // 运行模式:本地运行
.getOrCreate() // 获取或创建Spark会话对象
// 使用csv()方法加载本地CSV文件
val df_csv = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("data/users.csv")
// 显示数据
df_csv.show()
// 关闭会话对象
spark.stop()
}
}
- 运行程序,查看结果
3.3 本地文件系统加载TEXT格式数据
3.3.1 概述
- TEXT格式数据通常指纯文本文件,每行数据作为一个字符串处理。加载TEXT格式数据时,可以使用
DataFrameReader
的text()
方法或通过format("text")
指定格式。
3.3.2 案例演示
- 在
data
里创建users.txt
文件
李小玲 女 45
童安格 男 26
陈燕文 女 18
王晓明 男 32
张丽华 女 29
刘伟强 男 40
赵静怡 女 22
孙强东 男 35
- 在
net.huawei.sql
包里创建LoadText
对象
package net.huawei.sql
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 功能:加载TEXT数据
* 作者:华卫
* 日期:2025年01月17日
*/
object LoadText {
def main(args: Array[String]): Unit = {
// 获取或创建Spark会话对象
val spark = SparkSession.builder() // 创建Builder对象
.appName("LoadTEXT") // 设置应用程序名称
.master("local[*]") // 运行模式:本地运行
.getOrCreate() // 获取或创建Spark会话对象
// 使用text()方法加载本地TEXT文件
val df_text = spark.read.text("data/users.txt")
// 显示数据
df_text.show()
// 关闭会话对象
spark.stop()
}
}
- 运行程序,查看结果
3.4 本地文件系统加载Parquet格式数据
3.4.1 概述
- Parquet 是一种列式存储格式,广泛用于大数据处理。相比行式存储(如 CSV),Parquet 具有高效压缩、高性能查询和广泛兼容性(支持 Spark、Hive 等)。在 Spark 中,可通过
parquet()
或format("parquet")
加载 Parquet 文件,适合大规模数据存储与处理。
3.4.2 案例演示
- 将CSV格式数据转换成Parquet格式数据
- 在
net.huawei.sql
包里创建CSVToParquet
对象
package net.huawei.sql
/**
* 功能:将CSV转成Parquet
* 作者:华卫
* 日期:2025年01月17日
*/
import org.apache.spark.sql.SparkSession
object CSVToParquet {
def main(args: Array[String]): Unit = {
// 获取或创建Spark会话对象
val spark = SparkSession.builder() // 创建Builder对象
.appName("CSV To Parquet") // 设置应用程序名称
.master("local[*]") // 运行模式:本地运行
.getOrCreate() // 获取或创建Spark会话对象
// 读取CSV文件
val df = spark.read
.option("header", true) // 第一行作为列名
.option("inferSchema", true) // 自动推断数据类型
.csv("data/users.csv") // CSV文件路径
// 打印Schema和数据
println("===模式===")
df.printSchema()
println("===数据===")
df.show()
// 将DataFrame保存为Parquet文件
df.write.parquet("data/users.parquet")
println("成功生成users.parquet文件~")
// 关闭Spark会话对象
spark.stop()
}
}
- 运行程序,查看结果
- 查看
users.parquet
,Parquet
文件是二进制格式,无法直接查看,但可以通过Spark或其他工具读取。
- 在
net.huawei.sql
包里创建LoadParequet
对象
package net.huawei.sql
import org.apache.spark.sql.SparkSession
/**
* 功能:加载Parquet数据
* 作者:华卫
* 日期:2025年01月17日
*/
object LoadParquet {
def main(args: Array[String]): Unit = {
// 获取或创建Spark会话对象
val spark = SparkSession.builder() // 创建Builder对象
.appName("LoadParquet") // 设置应用程序名称
.master("local[*]") // 运行模式:本地运行
.getOrCreate() // 获取或创建Spark会话对象
// 使用parquet()方法加载本地Parquet文件
val df_parquet = spark.read.parquet("data/users.parquet")
// 显示数据
df_parquet.show()
// 显示数据结构
df_parquet.printSchema()
// 关闭会话对象
spark.stop()
}
}
- 运行程序,查看结果
3.5 通用加载文件方式加载各种格式数据
3.5.1 概述
- 通过
DataFrameReader
的format()
和load()
方法,可以灵活地加载不同格式的数据文件。这种方式不仅适用于JSON格式,还可以用于CSV、TEXT、Parquet等其他格式。
3.5.2 案例演示
- 在
net.huawei.sql
包里创建LoadData
对象
package net.huawei.sql
import org.apache.spark.sql.SparkSession
/**
* 功能:加载各种格式数据
* 作者:华卫
* 日期:2025年01月17日
*/
object LoadData {
def main(args: Array[String]): Unit = {
// 获取或创建Spark会话对象
val spark = SparkSession.builder() // 创建Builder对象
.appName("LoadData") // 设置应用程序名称
.master("local[*]") // 运行模式:本地运行
.getOrCreate() // 获取或创建Spark会话对象
//使用format()和load()方法加载本地JSON文件
val df_json = spark.read.format("json")
.load("data/users.json")
//使用format()和load()方法加载本地CSV文件
val df_csv = spark.read.format("csv")
.option("header", true)
.option("inferSchema",true)
.load("data/users.csv")
//使用format()和load()方法加载本地TEXT文件
val df_text = spark.read.format("text")
.load("data/users.txt")
//使用format()和load()方法加载本地Parquet文件
val df_parquet = spark.read.format("parquet")
.load("data/users.parquet")
// 显示数据
println("===显示加载的JSON数据===")
df_json.show()
println("===显示加载的CSV数据===")
df_csv.show()
println("===显示加载的TEXT数据===")
df_text.show()
println("===显示加载的Parquet数据===")
df_parquet.show()
// 关闭会话对象
spark.stop()
}
}
- 运行程序,查看结果
4. 大数据存储概述
4.1 数据存储的重要性
- 在大数据生态系统中,存储系统是核心组成部分。无论是数据采集、数据处理,还是数据分析,都离不开高效、可靠的存储系统。存储系统不仅需要保存原始数据,还需要存储经过分析后的有价值的结果,以供各部门使用。
4.2 常见的数据持久化外部系统
- 文件系统:包括本地文件系统和分布式文件系统(如HDFS)。文件系统适合存储大规模的非结构化或半结构化数据。
- 关系型数据库:适用于结构化数据的存储和高效查询,常用于事务处理和复杂查询。
- Hive数据仓库:基于Hadoop的数据仓库工具,适合大规模数据的批处理和分析。
- 其他存储系统:如果以上系统不能满足业务需求,我们可以将DataFrame或DataSet转换为RDD,利用RDD支持的多种外部存储系统。
4.3 大数据计算框架的基石
- 存储系统是大数据计算框架的基石。一个计算框架首先需要从存储系统中加载数据,形成可处理的数据模型(如
DataFrame
、DataSet
或RDD
)。基于这些数据模型,我们可以进行各种数据分析操作。最终,分析结果需要持久化到外部存储系统,以便各部门使用。
5. 数据存储核心API使用
5.1 持久化数据到外部文件系统步骤
- 创建
SparkSession
实例对象 - 通过
SparkSession
实例对象提供的方法加载外部数据 - 数据分析
- 对数据分析结果进行持久化
5.2 将数据帧保存到本地文件
- 在
net.huawei.sql
包里创建SaveData
对象
package net.huawei.sql
import org.apache.spark.sql.SparkSession
/**
* 功能:保存数据到本地文件
* 作者:华卫
* 日期:2025年01月17日
*/
// 声明用户样例类
case class User(name: String, gender: String, age: Long)
object SaveData {
def main(args: Array[String]): Unit = {
// 获取或创建Spark会话对象
val spark = SparkSession.builder() // 创建Builder对象
.appName("SaveData") // 设置应用程序名称
.master("local[*]") // 运行模式:本地运行
.getOrCreate() // 获取或创建Spark会话对象
// 导入隐式转换
import spark.implicits._
// 基于序列创建数据帧
val userDF = Seq(
User("陈燕文", "女", 20),
User("张小文", "男", 27),
User("王丽霞", "女", 18)
).toDF()
// 显示数据
userDF.show()
// 保存数据到本地文件
userDF.write.mode("overwrite").save("log/users.parquet")
println("users.parquet保存成功~")
userDF.write.mode("overwrite").csv("log/users.csv")
println("users.csv保存成功~")
userDF.write.mode("overwrite").json("log/users.json")
println("users.json保存成功~")
// 关闭会话对象
spark.stop()
}
}
- 运行程序,查看结果
- 查看保存在本地的各种格式的数据文件
5.3 将数据帧保存到HDFS文件
- 在
net.huawei.sql
包里创建SaveDataToHDFS
对象
package net.huawei.sql
import org.apache.spark.sql.SparkSession
/**
* 功能:保存数据到HDFS
* 作者:华卫
* 日期:2025年01月17日
*/
object SaveDataToHDFS {
def main(args: Array[String]): Unit = {
// 获取或创建Spark会话对象
val spark = SparkSession.builder() // 创建Builder对象
.appName("SaveDataToHDFS") // 设置应用程序名称
.master("local[*]") // 运行模式:本地运行
.config("dfs.client.use.datanode.hostname", "true") // 设置HDFS节点名称
.getOrCreate() // 获取或创建Spark会话对象
// 导入隐式转换
import spark.implicits._
// 基于序列创建数据帧
val userDF = Seq(
User("陈燕文", "女", 20),
User("张小文", "男", 27),
User("王丽霞", "女", 18)
).toDF()
// 显示数据
userDF.show()
// 保存数据到HDFS文件
userDF.write.mode("overwrite").json("hdfs://bigdata1:9000/log/users.json")
println("hdfs://bigdata1:9000/log/users.json保存成功~")
// 关闭会话对象
spark.stop()
}
}
-
运行程序,查看结果
-
执行命令:
hdfs dfs -ls /log/users.json
-
执行命令:
hdfs dfs -cat /log/users.json/*
6. 数据源与数据存储小结
- 在大数据生态系统中,数据源和数据存储是核心组成部分。Spark SQL 支持多种数据源,包括 HDFS、S3、Hive、Parquet、JSON 等,能够灵活加载和处理结构化数据。通过
DataFrameReader
和DataFrameWriter
,用户可以轻松地从本地文件系统或分布式文件系统加载数据,并将分析结果持久化到外部存储系统。常见的存储格式如 CSV、JSON、Parquet 各有优势:CSV 适合人类阅读,JSON 灵活易用,而 Parquet 则以高效的列式存储和压缩性能著称。数据存储不仅是数据处理的起点,也是分析结果的归宿,选择合适的存储格式和系统对提升数据处理效率至关重要。
原文地址:https://blog.csdn.net/howard2005/article/details/145184298
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!