自学内容网 自学内容网

深入解析 SparkSQL:从基础到实践与优化

目录

一、SparkSQL 详解

一)Spark 的组成部分

二)SparkSQL 的由来

三)SparkSQL 的技术定义与应用场景

四)SparkSQL 与 SparkCore 对比

二、SparkSQL 编程实战

1、第一个综合案例:

2、使用sparkSQL编写wordCount

3、使用SparkSQL中的DSL操作WordCount

三、优化技巧

四、总结


        在大数据处理领域,Spark 作为一款强大的开源分布式计算框架,占据着举足轻重的地位。而 SparkSQL 作为 Spark 生态系统中专门针对结构化数据计算设计的关键模块,更是广泛应用于各类数据处理场景,无论是离线数据分析、实时流计算,还是机器学习任务前的数据预处理等环节,都发挥着不可替代的作用。本文将带大家全面认识 SparkSQL,从其基本概念、技术由来,到编程实战以及优化技巧,一步步揭开它的神秘面纱。

一、SparkSQL 详解

一)Spark 的组成部分

        Spark 整体架构丰富且精妙,主要包含 SparkCore、SparkSQL、Spark Streaming、MLlib(机器学习库)以及 GraphX(图计算)等部分。SparkCore 作为核心基础,提供了分布式任务调度、内存管理、错误恢复等基础能力,是整个 Spark 运行的基石;SparkSQL 专注于结构化数据处理,赋予使用者类似 SQL 操作的便捷性;Spark Streaming 支持实时数据流处理,能对源源不断流入的数据进行高效分析;MLlib 集成众多机器学习算法,助力大数据场景下的模型训练与预测;GraphX 则应对图结构数据,实现图相关的计算与分析操作。

二)SparkSQL 的由来

        早期使用 SparkCore 编写程序应对结构化数据时,过程繁琐复杂,鉴于日常处理数据大多是结构化形式(像常见的 JSON、CSV、TSV、parquet、orc 这类结构化文件,以及 MySQL、Oracle、Hive 中的数据库表数据),而 SQL 语言天然适配结构化数据处理场景,于是 SparkSQL 应运而生。

结构化的文件:JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc
结构化的表:数据库中表的数据:MySQL、Oracle、Hive

对比:同样是进行wordcount统计,使用sparkCore的写法:

使用SparkSQL编写:

        SparkSQL 它的sql语句跟hive sql很像,跟我们的mysql的语句也非常像,它可以将SQL直接转换为SparkCore进行计算。

        将一些结构化的数据进行sql查询,需要将数据变为表,是表就必须有表结构,表结构就是Schema。

问题:SparkCore只能通过普通Python/Scala/Java代码编程的方式 来使用,对于统计分析的需求不是特别的友好怎么办?

解决:一群大数据开发者,将Hive源码直接独立出来,将底层解析成MapReduce的代码替换成解析变成SparkCore,这个是Shark:鲨鱼技术的由来。

Shark源码是依赖于Hive和Spark,只要Hive或者Spark一旦发生更新,Shark就必须更新。

解决:DataBricks团队将Shark的源代码重构,开发了SparkSQL模块,融合到了Spark中。

记住的:PySpark是看不到DataSet,PySpark只能看到DataFrame,底层都是DataSet。

DataFrame = 数据 + Schema

三)SparkSQL 的技术定义与应用场景

  1. 定义:SparkSQL 立足 SparkCore 之上,专为结构化数据计算量身定制,同时对半结构化数据(如 JSON)处理也游刃有余。它对外提供 SQL 和 DSL(领域特定语言)开发接口,能够无缝将 SQL 或 DSL 语句转化为 SparkCore 程序,高效处理数据。其语法与 Hive SQL 兼容性超 90%,学习成本低且上手快。
  2. 应用场景
    • 离线计算:深度集成 Hive,在离线数据仓库数据计算环节大显身手,涵盖数据分析、数据分层转换等任务,像电商项目数据处理中可取代 Presto,高效完成诸如按日、月统计销售数据,分析用户行为路径等复杂分析。
    • 实时计算:依托 StructStreaming(结构化流计算),基于 SparkSQL 编写代码实现实时流数据处理,重合度高达 90%。比如监控电商网站实时订单流,统计每秒订单量、实时热门商品销量排名等。
    • 机器学习:与 SparkMLlib 协同,基于 SparkSQL 编程构建机器学习流程,先利用 SparkSQL 清洗、转换数据(如特征工程里对数值型特征归一化、类别型特征编码等),为后续模型训练筑牢基础。

四)SparkSQL 与 SparkCore 对比

 

       SparkCore 侧重底层基础能力构建,以 RDD(弹性分布式数据集)为核心抽象,编程需借助复杂代码逻辑操控 RDD 实现数据处理,像手动编写分区、转换、聚合等逻辑;而 SparkSQL 借助 SQL 或 DSL 语法,使用者无需过度关注底层分布式细节,专注数据逻辑表达,开发效率大幅提升。打个比方,统计一组销售数据里各地区销售额,用 SparkCore 要历经复杂 RDD 转换,用 SparkSQL 简单几条 SQL 语句结合分组聚合函数轻松搞定。

二、SparkSQL 编程实战

pyspark --master local[*] 敲回车

pyspark --master spark://bigdata01:7077 standalone模式

pyspark --master yarn yarn模式

开发步骤:

step1:在Python代码中构建驱动对象:SparkSession
step2:读取数据变成DataFrame,使用SQL或者DSL进行处理,保存处理好的结果
step3:关闭驱动对象:stop

如何构建SparkSession:

from pyspark.sql import SparkSession

if __name__ == '__main__':
sparkSession = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config("spark.sql.shuffle.partitions",2).getOrCreate()

编写sql:

step1:先将DataFrame注册成一张临时的视图【只读的表】
df.createOrReplaceTempView("people")
step2:使用SQL对临时视图进行处理,得到新的DataFrame
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|

编写DSL:

类似于RDD的编程方式:调用算子函数来实现处理
流程:直接调用DataFrame的DSL函数进行处理原生DSL函数【将SQL语法变成了函数】:select、where、
groupBy、orderBy、limit、count、agg
比写SQL更灵活:不用考虑SQL执行顺序部分RDD算子函数:repartition、coalesce、persist、
unpersist、checkpoint

使用:
# 打印表结构
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# | Justin|       20|
# +-------+---------+

df.where(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# | 19|   1|
# |null|   1|
# | 30|   1|

1、第一个综合案例:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
import os

from pyspark.sql import SparkSession

if __name__ == '__main__':
    #获取sparkSession对象
# 设置 任务的环境变量
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 得到sparkSession对象
spark = SparkSession.builder.master("local[2]").appName("第一个sparksql案例").config("spark.sql.shuffle.partitions",2).getOrCreate()
# 处理逻辑
print(spark)
# 读取外部数据,获取dataframe
df = spark.read.json("../../datas/sql/person.json")
print(df) # DataFrame类型
# 给数据集起个名字,该名字随便起
df.createOrReplaceTempView("person")
# 通过sql 获取想要的数据,返回值是一个DataFrame
df2 = spark.sql("select name,age from person")
# 想要展示dataframe中的数据,可以选择使用show 方法,列名和数据一起展示
df2.show()
"""
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
+-------+----+
"""
# 使用DSL 写法操作数据
"""
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
"""
df.printSchema() # 打印一个dataFrame的表结构
df.select("name").show()
df.select("name","age").show()

df.where(df['age'] > 20).show()
# 分组并统计
df.groupby("name").sum("age").show()
# select age,count(1) from 表名 group by age order by age limit 2
df.groupby("age").count().orderBy("age",ascending=False).limit(2).show()

# 以上这些都是dsl写法,能不能使用sql写法呢?
spark.sql("select age,count(1) from person group by age order by age desc limit 2").show()

# 关闭
spark.stop()

2、使用sparkSQL编写wordCount

将经常使用的代码变为一个模板:

from pyspark.sql import SparkSession
import os

"""
------------------------------------------
  Description : TODO:
  SourceFile : ${NAME}
  Author  : 
  Date  : ${DATE}
-------------------------------------------
"""

if __name__ == '__main__':
# 构建环境变量
# 配置环境
os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

# 获取sparkSession对象
spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config(
"spark.sql.shuffle.partitions", 2).getOrCreate()


spark.stop()

import os

from pyspark.sql import SparkSession


if __name__ == '__main__':
# 获取sparkSession对象
# 设置 任务的环境变量
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 得到sparkSession对象
spark = SparkSession.builder.master("local[2]").appName("使用sparksql编写wordcount").config("spark.sql.shuffle.partitions", 2).getOrCreate()
# 处理逻辑
df = spark.read.text("../../datas/data.txt")
df.createOrReplaceTempView("wordcount")
# 假如一个数据没有列名,默认列名为value
# show(truncate=False) 默认显示20个 字符,超过的变为...,假如想全部显示,可以写成truncate=False
spark.sql("""
    with t as (
       select word from wordcount lateral view explode(split(value," ")) a as word
        where trim(word) !=''
    )
    select word,count(1) from t group by word order by count(1) desc
""").show(truncate=False)

spark.sql("""
    with t as (
       select explode(split(value," ")) as word from wordcount
    )
    select word,count(1) from t where trim(word)!='' group by word order by count(1) desc
""").show(truncate=False)

# 关闭
spark.stop()

3、使用SparkSQL中的DSL操作WordCount

import os

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

if __name__ == '__main__':
# 获取sparkSession对象
# 设置 任务的环境变量
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 得到sparkSession对象
spark = SparkSession.builder.master("local[2]").appName("DSL版本的wordcount").config("spark.sql.shuffle.partitions", 2).getOrCreate()
# 处理逻辑
df = spark.read.text("../../datas/data.txt")
# DSL的第一种写法
# 第一种写法
df.select(F.explode(F.split("value"," ")).alias("word")).where(F.trim(F.col("word")) != '')\
 .groupby(F.col("word")).count().alias("count").orderBy(F.col("count"),ascending=False)\
 .show()
# 第二种写法
df.select(F.explode(F.split("value", " ")).alias("word")).where(F.trim(F.col("word")) != '') \
.groupby(F.col("word")).agg(F.count(F.col("word")).alias("count")).orderBy(F.col("count"), ascending=False) \
.show()

# 关闭
spark.stop()

三、优化技巧

        在 SparkSQL 编程中,合理优化能显著提升性能与资源利用率。利用with语句结合SparkSession(或SparkContext)的魔法方法,能确保资源自动管理与释放,示例如下:

import os

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

if __name__ == '__main__':
# 获取sparkSession对象
# 设置 任务的环境变量
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 得到sparkSession对象
with SparkSession.builder.master("local[2]").appName("DSL版本的wordcount").config("spark.sql.shuffle.partitions", 2).getOrCreate() as spark:
# 处理逻辑
df = spark.read.text("../../datas/data.txt")
# DSL的第一种写法
# 第一种写法
df.select(F.explode(F.split("value"," ")).alias("word")).where(F.trim(F.col("word")) != '')\
 .groupby(F.col("word")).count().alias("count").orderBy(F.col("count"),ascending=False)\
 .show()
# 第二种写法
df.select(F.explode(F.split("value", " ")).alias("word")).where(F.trim(F.col("word")) != '') \
.groupby(F.col("word")).agg(F.count(F.col("word")).alias("count")).orderBy(F.col("count"), ascending=False) \
.show()

        这种写法避免资源泄露,让代码更简洁、健壮,尤其在复杂数据处理流程里,多次创建销毁SparkSession场景下优势尽显,提升代码整体质量与执行效率。

四、总结

        SparkSQL 凭借简洁语法、强大功能、广泛应用场景,已然成为大数据处理利器。从基础概念理解、编程实战演练到优化技巧掌握,是用好它的必经之路。希望本文能助力大家在大数据分析征程中,灵活运用 SparkSQL 攻克各类结构化数据处理难题,挖掘数据深层价值,提升业务洞察与决策能力。后续大家可深入探索 SparkSQL 高级特性,如数据源扩展、查询优化器调优等,持续精进大数据处理技术栈。


原文地址:https://blog.csdn.net/weixin_64726356/article/details/144290979

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!