SparkSQL 读写数据攻略:从基础到实战
目录
在大数据处理领域,SparkSQL 以其强大的数据处理能力和丰富的数据源支持备受青睐。它能够高效地读取和写入多种格式的数据,无论是本地文件、分布式文件系统(如 HDFS)上的数据,还是数据库、Hive 表中的数据,都能轻松驾驭。今天,就让我们深入探究 SparkSQL 读写数据的方式,通过详细的代码示例和原理讲解,助你全面掌握这一关键技能。
一、输入Source
- 类型:text / csv【任意固定分隔符】 / json / orc / parquet / jdbc / table【Hive表】
- 语法:spark.read.format(格式).load(读取的地址)
方式一:给定读取数据源的类型和地址
spark.read.format("json").load(path)
spark.read.format("csv").load(path)
spark.read.format("parquet").load(path)
方式二:直接调用对应数据源类型的方法
spark.read.json(path)
spark.read.csv(path)
spark.read.parquet(path)
特殊参数:option,用于指定读取时的一些配置选项
spark.read.format("csv").option("sep", "\t").load(path)
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.load()
1)代码演示最普通的文件读取方式:
from pyspark.sql import SparkSession
import os
if __name__ == '__main__':
# 构建环境变量
# 配置环境
os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 获取sparkSession对象
spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config(
"spark.sql.shuffle.partitions", 2).getOrCreate()
df01 = spark.read.json("../../datas/resources/people.json")
df01.printSchema()
df02 = spark.read.format("json").load("../../datas/resources/people.json")
df02.printSchema()
df03 = spark.read.parquet("../../datas/resources/users.parquet")
df03.printSchema()
#spark.read.orc("")
df04 = spark.read.format("orc").load("../../datas/resources/users.orc")
df04.printSchema()
df05 = spark.read.format("csv").option("sep",";").load("../../datas/resources/people.csv")
df05.printSchema()
df06 = spark.read.load(
path="../../datas/resources/people.csv",
format="csv",
sep=";"
)
df06.printSchema()
spark.stop()
2) 通过jdbc读取数据库数据
先在本地数据库或者linux数据库中插入一张表:
CREATE TABLE `emp` (
`empno` int(11) NULL DEFAULT NULL,
`ename` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`job` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`mgr` int(11) NULL DEFAULT NULL,
`hiredate` date NULL DEFAULT NULL,
`sal` decimal(7, 2) NULL DEFAULT NULL,
`comm` decimal(7, 2) NULL DEFAULT NULL,
`deptno` int(11) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of emp
-- ----------------------------
INSERT INTO `emp` VALUES (7369, 'SMITH', 'CLERK', 7902, '1980-12-17', 800.00, NULL, 20);
INSERT INTO `emp` VALUES (7499, 'ALLEN', 'SALESMAN', 7698, '1981-02-20', 1600.00, 300.00, 30);
INSERT INTO `emp` VALUES (7521, 'WARD', 'SALESMAN', 7698, '1981-02-22', 1250.00, 500.00, 30);
INSERT INTO `emp` VALUES (7566, 'JONES', 'MANAGER', 7839, '1981-04-02', 2975.00, NULL, 20);
INSERT INTO `emp` VALUES (7654, 'MARTIN', 'SALESMAN', 7698, '1981-09-28', 1250.00, 1400.00, 30);
INSERT INTO `emp` VALUES (7698, 'BLAKE', 'MANAGER', 7839, '1981-05-01', 2850.00, NULL, 30);
INSERT INTO `emp` VALUES (7782, 'CLARK', 'MANAGER', 7839, '1981-06-09', 2450.00, NULL, 10);
INSERT INTO `emp` VALUES (7788, 'SCOTT', 'ANALYST', 7566, '1987-04-19', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7839, 'KING', 'PRESIDENT', NULL, '1981-11-17', 5000.00, NULL, 10);
INSERT INTO `emp` VALUES (7844, 'TURNER', 'SALESMAN', 7698, '1981-09-08', 1500.00, 0.00, 30);
INSERT INTO `emp` VALUES (7876, 'ADAMS', 'CLERK', 7788, '1987-05-23', 1100.00, NULL, 20);
INSERT INTO `emp` VALUES (7900, 'JAMES', 'CLERK', 7698, '1981-12-03', 950.00, NULL, 30);
INSERT INTO `emp` VALUES (7902, 'FORD', 'ANALYST', 7566, '1981-12-03', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7934, 'MILLER', 'CLERK', 7782, '1982-01-23', 1300.00, NULL, 10);
dept的数据:
CREATE TABLE `dept` (
`deptno` int(11) NULL DEFAULT NULL,
`dname` varchar(14) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`loc` varchar(13) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of dept
-- ----------------------------
INSERT INTO `dept` VALUES (10, 'ACCOUNTING', 'NEW YORK');
INSERT INTO `dept` VALUES (20, 'RESEARCH', 'DALLAS');
INSERT INTO `dept` VALUES (30, 'SALES', 'CHICAGO');
INSERT INTO `dept` VALUES (40, 'OPERATIONS', 'BOSTON');
接着放驱动程序:
py4j.protocol.Py4JJavaError: An error occurred while calling o67.load.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:102)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:102)
Python环境放入MySQL连接驱动
- 找到工程中pyspark库包所在的环境,将驱动包放入环境所在的jars目录中
- 如果是Linux上:注意集群模式所有节点都要放。
第一种情况:
假如你是windows环境:
最终的路径是在这里:
第二种情况:linux环境下,按照如下方式进行
# 进入目录
cd /opt/installs/anaconda3/lib/python3.8/site-packages/pyspark/jars
# 上传jar包:mysql-connector-java-5.1.32.jar
代码练习:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
if __name__ == '__main__':
# 获取sparkSession对象
# 设置 任务的环境变量
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 得到sparkSession对象
spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()
# 处理逻辑
# 读取json 数据
df1 = spark.read.format("json").load("../../datas/sql/person.json")
df1.show()
# 另一种写法,推荐使用这一种
df2 = spark.read.json("../../datas/sql/person.json")
df2.show()
df3 = spark.read.csv("../../datas/dept.csv")
df4 = spark.read.format("csv").load("../../datas/dept.csv")
# 读取分隔符为别的分隔符的文件
user_schema = StructType([
StructField(name="emp_id", dataType=StringType(), nullable=False),
StructField(name="emp_name", dataType=StringType(), nullable=True),
StructField(name="salary", dataType=DoubleType(), nullable=True),
StructField(name="comm", dataType=DoubleType(), nullable=True),
StructField(name="dept_id", dataType=LongType(), nullable=True)
])
# 使用csv 读取了一个 \t 为分隔符的文件,读取的数据字段名很随意,所以可以自定义
df5 = spark.read.format("csv").option("sep","\t").load("../../datas/emp.tsv",schema=user_schema)
df5.show()
# 昨天的作业是否也可以有另一个写法
movie_schema = StructType([
StructField(name="movie_id", dataType=LongType(), nullable=False),
StructField(name="movie_name", dataType=StringType(), nullable=True),
StructField(name="movie_type", dataType=StringType(), nullable=True)
])
movieDF = spark.read.format("csv").option("sep","::").load("../../datas/zuoye/movies.dat",schema=movie_schema)
movieDF.show()
spark.read.load(
path="../../datas/zuoye/movies.dat",
format="csv",
sep="::",
schema=movie_schema
).show()
dict = {"user":"root","password":"root"}
jdbcDf = spark.read.jdbc(url="jdbc:mysql://localhost:3306/spark",table="emp",properties=dict)
jdbcDf.show()
# jdbc的另一种写法
jdbcDf2 = spark.read.format("jdbc") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("url", "jdbc:mysql://localhost:3306/spark") \
.option("dbtable", "spark.dept") \
.option("user", "root") \
.option("password", "root").load()
jdbcDf2.show()
# 读取hive表中的数据
# 关闭
spark.stop()
3) 读取table中的数据【hive】
海量数据,如何处理,存储在hdfs上
第一种:
使用spark读取hdfs上的数据(可以使用sparkCore读取,也可以使用sparksql读取),将数据变为表【数据+Schema】,然后编写sql或者sparkCore代码。
rdd --> dataFrame
第二种:推荐
将hdfs上的数据映射成hive的表,然后通过sparkSql连接hive, 编写 sql 处理需求。
- 场景:Hive底层默认是MR引擎,计算性能特别差,一般用Hive作为数据仓库,使用SparkSQL对Hive中的数据进行计算
-
- 存储:数据仓库:Hive:将HDFS文件映射成表
- 计算:计算引擎:SparkSQL、Impala、Presto:对Hive中的数据表进行处理
- 问题:SparkSQL怎么能访问到Hive中有哪些表,以及如何知道Hive中表对应的HDFS的地址?
Hive中的表存在哪里?元数据--MySQL , 启动metastore服务即可。
本质上:SparkSQL访问了Metastore服务获取了Hive元数据,基于元数据提供的地址进行计算
先退出base环境:conda deactivate
启动服务:
启动hdfs: start-dfs.sh 因为hive的数据在那里存储着
启动yarn: start-yarn.sh 因为spark是根据yarn部署的,假如你的spark是standalone模式,不需要启动yarn.
日志服务也需要启动一下:
mapred --daemon start historyserver
# 启动Spark的HistoryServer:18080
/opt/installs/spark/sbin/start-history-server.sh
启动metastore服务: 因为sparkSQL需要知道表结构,和表数据的位置
hive-server-manager.sh start metastore
启动spark服务: 啥服务也没有了,已经启动完了。
查看metastore服务:
hive-server-manager.sh status metastore
修改配置:
cd /opt/installs/spark/conf
新增:hive-site.xml
vi hive-site.xml
在这个文件中,编写如下配置:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://bigdata01:9083</value>
</property>
</configuration>
接着将该文件进行分发:
xsync.sh hive-site.xml
操作sparkSQL:
/opt/installs/spark/bin/pyspark --master local[2] --conf spark.sql.shuffle.partitions=2
此处的pyspark更像是一个客户端,里面可以通过python编写spark代码而已。而我们以前安装的pyspark更像是spark的python运行环境。
进入后,通过内置对象spark:
>>> spark.sql("show databases").show()
+---------+
|namespace|
+---------+
| default|
| yhdb|
+---------+
>>> spark.sql("select * from yhdb.student").show()
+---+------+
|sid| sname|
+---+------+
| 1|laoyan|
| 1|廉德枫|
| 2| 刘浩|
| 3| 王鑫|
| 4| 司翔|
+---+------+
开发环境如何编写代码,操作hive:
Pycharm工具集成Hive开发SparkSQL,必须申明Metastore的地址和启用Hive的支持
spark = SparkSession \
.builder \
.appName("HiveAPP") \
.master("local[2]") \
.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \
.config('hive.metastore.uris', 'thrift://bigdata01:9083') \
.config("spark.sql.shuffle.partitions", 2) \
.enableHiveSupport()\
.getOrCreate()
代码实战:
from pyspark.sql import SparkSession
import os
if __name__ == '__main__':
# 构建环境变量
# 配置环境
os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 防止在本地操作hdfs的时候,出现权限问题
os.environ['HADOOP_USER_NAME'] = 'root'
# 获取sparkSession对象
spark = SparkSession \
.builder \
.appName("HiveAPP") \
.master("local[2]") \
.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \
.config('hive.metastore.uris', 'thrift://bigdata01:9083') \
.config("spark.sql.shuffle.partitions", 2) \
.enableHiveSupport() \
.getOrCreate()
spark.sql("select * from yhdb.student").show()
spark.stop()
代码还可以这样写:
方式二:加载Hive表的数据变成DF,可以调用DSL或者SQL的方式来实现计算
# 读取Hive表构建DataFrame
hiveData = spark.read.table("yhdb.student")
hiveData.printSchema()
hiveData.show()
# 读取hive表中的数据
spark2 = SparkSession \
.builder \
.appName("HiveAPP") \
.master("local[2]") \
.config("spark.sql.warehouse.dir", 'hdfs://192.168.233.128:9820/user/hive/warehouse') \
.config('hive.metastore.uris', 'thrift://192.168.233.128:9083') \
.config("spark.sql.shuffle.partitions", 2) \
.enableHiveSupport() \
.getOrCreate()
#spark2.sql("show databases").show()
#spark2.sql("show tables").show()
#spark2.sql("select * from yhdb.t_user").show()
spark2.read.table("t_user2").show()
不要在一个python 文件中,创建两个不同的sparkSession对象,否则对于sparksql获取hive的元数据,有影响。另外,记得添加一个权限校验的语句:
# 防止在本地操作hdfs的时候,出现权限问题
os.environ['HADOOP_USER_NAME'] = 'root'
为什么有些平台不支持,不兼容 sqoop flume datax 这些工具呢?
spark 可以读取日志数据
spark 可以读取数据库数据
spark 可以读取 hdfs 数据
spark 可以读取 hive 数据
------------------------------------
spark 可以读取日志数据,形成一个 A 表,读取 mysql 数据,形成一个 B 表
A 表和 B 表还可以相互关联,此时也就不需要 sqoop、flume、datax 去导入导出了。
spark 还可以将统计出来的结果直接放入 mysql 或者直接放入 hive
--------------------
我们后面学习的内容还是沿用 将日志数据,数据库数据等所有数据抽取到 hive ,然后呢,使用 spark 去统计,统计完之后还是放入 hive ,使用 datax 等工具将结果导出 mysql。
二、输出Sink
sink --> 下沉 --> 落盘 --> 保存起来
如果输出路径或者表已经存在了怎么办
- 类型:text /csv【所有具有固定分隔符的文件】/ json/ orc/ parquet / jdbc / table【Hive表】
- 语法:DataFrame.write.format(保存的类型).save(保存到哪)
-
- 方法:save-保存到文件save(path)或者数据库表save()中,saveAsTable-用于保存到Hive表
方式一:给定输出数据源的类型和地址
df.write.format("json").save(path)
df.write.format("csv").save(path)
df.write.format("parquet").save(path)
方式二:直接调用对应数据源类型的方法
df.write.json(path)
df.write.csv(path)
df.write.parquet(path)
特殊参数:option,用于指定输出时的一些配置选项
df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()
输出模式:Save Mode
append: 追加模式,当数据存在时,继续追加
overwrite: 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据;
error/errorifexists: 如果目标存在就报错,默认的模式
ignore: 忽略,数据存在时不做任何操作
代码如何编写:
df.write.mode(saveMode="append").format("csv").save(path)
实战一:保存普通格式
import os
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 配置环境
os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
spark = SparkSession.builder.master("local[2]").appName("").config(
"spark.sql.shuffle.partitions", 2).getOrCreate()
df = spark.read.json("../../datas/person.json")
# 获取年龄最大的人的名字
df.createOrReplaceTempView("persons")
rsDf = spark.sql("""
select name,age from persons where age = (select max(age) from persons)
""")
# 将结果打印到控制台
#rsDf.write.format("console").save()
#rsDf.write.json("../../datas/result",mode="overwrite")
#rsDf.write.mode(saveMode='overwrite').format("json").save("../../datas/result")
#rsDf.write.mode(saveMode='overwrite').format("csv").save("../../datas/result1")
#rsDf.write.mode(saveMode='overwrite').format("parquet").save("../../datas/result2")
#rsDf.write.mode(saveMode='append').format("csv").save("../../datas/result1")
# text 保存路径为hdfs 直接报错,不支持
#rsDf.write.mode(saveMode='overwrite').text("hdfs://bigdata01:9820/result")
#rsDf.write.orc("hdfs://bigdata01:9820/result",mode="overwrite")
rsDf.write.parquet("hdfs://bigdata01:9820/result", mode="overwrite")
spark.stop()
假如:
spark.sql("select concat(name,' ',age) from person").write.text("hdfs://bigdata01:9820/spark/result")
直接报错:假如你的输出类型是text类型,直接报错
pyspark.sql.utils.AnalysisException: Text data source does not support bigint data type.
假如修改为parquet等类型,是可以直接保存的:
rsDf.write.parquet("hdfs://bigdata01:9820/result")
实战二:保存到数据库中
import os
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 配置环境
os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
spark = SparkSession.builder.master("local[2]").appName("").config(
"spark.sql.shuffle.partitions", 2).getOrCreate()
df = spark.read.format("csv").option("sep","\t").load("../../datas/zuoye/emp.tsv").toDF("id","name","sal","comm","deptno")
# 获取年龄最大的人的名字
df.createOrReplaceTempView("emps")
rsDf = spark.sql("""
select * from emps where comm is not null
""")
# 不需要事先将表创建好,它可以帮助我们创建
rsDf.write.format("jdbc") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("url", "jdbc:mysql://localhost:3306/spark?characterEncoding=UTF-8") \
.option("user","root") \
.option("password", "123456") \
.option("dbtable", "emp1") \
.save(mode="overwrite")
spark.stop()
实战三:将结果保存在hive表中
import os
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 配置环境
os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
os.environ['HADOOP_USER_NAME'] = 'root'
spark = SparkSession \
.builder \
.appName("测试本地连接hive") \
.master("local[2]") \
.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \
.config('hive.metastore.uris', 'thrift://bigdata01:9083') \
.config("spark.sql.shuffle.partitions", 2) \
.enableHiveSupport() \
.getOrCreate()
df = spark.read.format("csv").option("sep", "\t").load("../../datas/zuoye/emp.tsv").toDF("id", "name", "sal",
"comm", "deptno")
# 获取年龄最大的人的名字
df.createOrReplaceTempView("emps")
rsDf = spark.sql("""
select * from emps where comm is not null
""")
rsDf.write.saveAsTable("yhdb03.emp")
spark.stop()
三、总结
SparkSQL 读写数据功能丰富强大,涵盖多种数据源与格式,理解其原理、语法和操作细节,结合不同业务场景(如数据分析、数据迁移、数据存储优化等)灵活运用,能极大提升大数据处理效率,助力在大数据领域深挖数据价值、攻克业务难题,为数据驱动决策筑牢根基。后续实践中,多尝试不同数据、场景组合,深化掌握程度。
原文地址:https://blog.csdn.net/weixin_64726356/article/details/144330272
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!