SparkSQL介绍及使用
SparkSQL介绍及使用
一、什么是SparkSQL(了解)
spark开发时可以使用rdd进行开发,spark还提供saprksql工具,将数据转为结构化数据进行操作
1-1 介绍
官网:https://spark.apache.org/sql/
Spark SQL是 Apache Spark 用于处理结构化数据(DataFrame和Datasets)的模块。
在Spark1.0版本时引入了SparkSQL
数据的结构形式
- 结构化数据
- 表,DataFrame,Datasets
- 构成
- 元数据 描述数据的数据(描述信息,类型约束)
- 数据本身
身高 int |
---|
179 |
170 |
156 |
132 |
200 |
-
半结构化数据
-
json,xml ,有数据的描述信息,但是对数据内容的类型无法约束
-
{ "name":"asdea" }
-
-
非结构化数据
- 文本文件
- 图片文件
- 视频文件
- 音频文件
sparksql可以将非结构化 ,半结构化数据统一转化为结构化数据处理
Spark中使用的结构化数据有 DataFrame ,映射表(离线数仓开发使用)
1-2 特点
- 易整合
- 使用sql配合spark一起使用,封装了不同语言的dsl方法
- 统一数据访问
- 使用read方法可以读取hdfs数据,mysql数据,不同类型的文件数据(json,csv,orc)
- 使用write方法可以写入hdfs,mysql不同类型的文件
- 兼容hive
- 使用hivesql方法
- 标准的数据连接
- 使用jdbc和odbc连接方式连接sparkSQL
1-3 SparkSQL与HiveSQL关系
- shark
- 运行的模式是hive on spark
- 会将hivesqsl转换为spark的rdd
- shark是基于hive开发的,维护麻烦,2015年停止维护
- sparkSQL
- 是spark团建独立开发的工具,2014年发布1.0版本
- sparkSQL工具对spark的兼容性更好,优化性能得到提升
- sparkSQL本质也是将sql语句转化为rdd执行,catalyst引擎负责将sql转化为rdd
- sparkSQL可以连接使用hive的metastore服务,管理表的元数据
二、DataFrame详解(理解)
DataFrame是基于RDD进行封装的结构化数据类型,增加了scheme元数据
其中DataFrame类型在计算时,还是转为rdd计算
DataFrame的结构化数据有Row(行数据)和scheme元数据构成
- Row 类型 表示一行数据
- datafram就算是多行构成
# 导入行类Row
from pyspark.sql import Row
# 创建行数据
r1 = Row(1, '张三', 20)
# 行数取取值 按照下标取值
data = r1[0]
print(data)
data1 = r1[1]
print(data1)
# 指定字段创建行数据
r2 = Row(id=2, name='李四', age=22)
# 按照字段取值
data3 = r2['id']
print(data3)
data4 = r2['name']
print(data4)
- schema表信息
- 定义dataframe中的表的字段名和字段类型
# 导入数据类型
from pyspark.sql.types import *
# 定义schema信息
# 使用StructType类进行定义
# add()方法是指定字段信息
# 第一参数,字段名
# 第二个参数,字段信息
# 第三个参数是否允许为空值 默认是True,允许为空
schema_type = StructType().\
add('id',IntegerType()).\
add('name',StringType()).\
add('age',IntegerType(),False)
三、DataFrame创建(掌握)
创建datafram数据
需要使用一个sparksession的类创建
SparkSession类是在SparkContext的基础上进行了封装,也就是SparkSession类中包含了SparkContext
3-1 基本创建
# 导入行类Row
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
# 创建行数据
r1 = Row(id=1, name='张三', age=20)
r2 = Row(id=2, name='李四', age=22)
# 创建元数据
schema = StructType(). \
add('id', IntegerType()). \
add('name', StringType()). \
add('age', IntegerType())
# 创建dataframe
# 生成sparksession对象 按照固定写法创建
ss = SparkSession.builder.getOrCreate()
# 使用sparksession对象方法创建df
# createDataFrame 第一参数是一个列表数据,将每行数据放入列表
# 第二个参数指定表元数据信息
# df是一个dataframe类型的对象
df = ss.createDataFrame([r1, r2], schema=schema)
# dataframe数据的操作
# 查看df数据
df.show() # 查看所有数据,超过20行时,默认只显示20行
# 查看元信息
df.printSchema()
3-2 RDD和DF之间的转化
- rdd的二维数据转化为dataframe
- rdd.toDF()
# RDD和DF之间的转换
# 导入SparkSession
from pyspark.sql import SparkSession
# 创建对象
ss = SparkSession.builder.getOrCreate()
# 使用sparksession获取sparkcontext
sc = ss.sparkContext # 不要括号,可以直接获取到sparkcontext对象
# 生成rdd数据
# rdd转df时,要求数据是二维嵌套列表
data = [[1,'张三',20,'男'],[2,'小红',19,'女']]
rdd = sc.parallelize(data)
# rdd转df
df = rdd.toDF(schema='id int,name string,age int,gender string')
# 查看df数据
df.show()
# 查看表结构
df.printSchema()
- df转为rdd
# 将df转为rdd
rdd2 = df.rdd
# 查看rdd中数据
res = rdd2.collect() # [Row(),Row()]
# 转化后的rdd中每个元素是有个Row类对象
print(res)
print(res[0])
print(res[0]['name'])
3-3 pandas和spark之间转化
pandas和spark之间的df相互转化
- pandas的df转为spark的df
# Pandas和spark之间的转化
import pandas as pd
from pyspark.sql import SparkSession
# 创建pd的df
pd_df = pd.DataFrame(
{
'id':[1,2,3,4],
'name':['a','b','c','d'],
'age':[20,21,22,24],
'gender':['男','女','男','男']
}
)
# 查看数据
print(pd_df)
# 将pd_df 转为spark的df
ss = SparkSession.builder.getOrCreate()
spark_df = ss.createDataFrame(pd_df)
# 查看数据
spark_df.show()
- spark的df转为pandas的df
- toPandas
# 将spark_df转为pd_df
pd_df2 = spark_df.toPandas()
print(pd_df2)
3-4 读取文件数据转为df
通过read方法读取数据转为df
- ss.read
# 读取数据转化为df
from pyspark.sql import SparkSession
# 创建sparksession
ss = SparkSession.builder.getOrCreate()
# 读取不同数据源
# header=True 是否需要获取表头
# sep 指定数据字段按照什么字符分割
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')
# schema当没有表头时,可以自己指定字段
df2 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=False,sep=',',schema='user_id string,username string,sex string,age string,cls string')
df3 = ss.read.json('hdfs://node1:8020/data/employees.json')
df4 = ss.read.orc('hdfs://node1:8020/data/users.orc')
df5 = ss.read.parquet('hdfs://node1:8020/data/users.parquet')
# 查看
# show中可以指定显示多少行,默认是20行
df.show(100)
df2.show()
四、DataFrame基本使用(掌握)
4-1 SQL语句
使用sparksession提供的sql方法,编写sql语句执行
# 使用sql方式开发
from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()
# 读取数据得到df数据
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='id string,name string,gender string,age int,cls string')
# 对df数据进行sql操作
# 需要给df指定一个表名
df.createTempView('tb_user')
# 编写sql语句执行
# sql执行后的结果被保存新的df中
new_df = ss.sql('select gender,avg(age) as avg_data from tb_user group by gender')
new_df.show()
4-2 DSL方法
DSL方法是df提供的数据操作函数
使用方式
df.方法()
可以进行链式调用
df.方法().方法().方法()
方法执行后返回一个新的df保存计算结果
new_df = df.方法()
spark提供DSL方法和sql的关键词一样,使用方式和sql基本类似,在进行数据处理时,要按照sql的执行顺序去思考如何处理数据
from join 知道数据在哪 df本身就是要处理的数据 df.join(df2) from 表
where 过滤需要处理的数据 df.join(df2).where()
group by 聚合 数据的计算 df.join(df2).where().groupby().sum()
having 计算后的数据进行过滤 df.join(df2).where().groupby().sum().where()
select 展示数据的字段 df.join(df2).where().groupby().sum().where().select()
order by 展示数据的排序 df.join(df2).where().groupby().sum().where().select().orderBy()
limit 展示数据的数量 df.join(df2).where().groupby().sum().where().select().orderBy().limit()
DSL方法执行完成后会得到一个处理后的新的df
# 使用DSL方式开发
from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()
# 生成df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='id string,name string,gender string,age int,cls string')
# 查看df数据
df.show()
print('---------------select方法----------------------')
# 使用select方法指定输出展示的数据字段
# 方式一指定字段
df_select = df.select('id','name')
# 方式二
df_select2 = df.select(df.age,df.gender)
# 方式三
df_select3 = df.select(df['id'],df['cls'])
df_select.show()
df_select2.show()
df_select3.show()
print('---------------alias方法----------------------')
# 字段名称修改,需要配合select中使用
df_alias = df.select(df.id.alias('user_id'),df.name.alias('username'))
df_alias.show()
print('---------------cast方法----------------------')
# 修改字段的数据类型
df.printSchema()
df_cast = df.select(df.id.cast('int'),df.name,df.age)
df_cast.printSchema()
print('---------------where方法----------------------')
# 数据过滤,where方法内部是调用了filter方法
# 方式1
df_where = df.where('age > 20')
df_where.show()
#方式2
df_where2 = df.where(df.age > 20)
df_where2.show()
# 与或非多条件 只能使用方式1 条件的书写和在sql中的where书写内容一样
df_where3 = df.where('age > 20 and gender = "男" ')
df_where3.show()
print('---------------groupby方法----------------------')
# 分组计算,可以配和聚合方法一起使用 使用该方式聚合一次只能计算一个聚合数据 ,可以使用内置函数配合agg方法
# groupby指定分组字段,可以指定多个
# avg 聚合方法 指定聚合字段 sum count avg max min
df_groupby = df.groupby('gender').avg('age')
df_groupby.show()
# groupby指定分组字段,可以指定多个
df_groupby2 = df.groupby('gender','cls').avg('age')
df_groupby2.show()
# 分组后的数据过滤
df_groupby3 = df.groupby('gender','cls').avg('age').where(' avg(age) > 19')
df_groupby3.show()
print('---------------orderby方法----------------------')
# 数据排序 内部调用sort方法
df_orderby = df.orderBy('age')
df_orderby.show()
# ascending=False 降序
df_orderby2 = df.orderBy('age',ascending=False)
df_orderby2.show()
print('---------------limit方法----------------------')
# 指定获取多条数据
df_limit = df.orderBy('age',ascending=False).limit(5)
df_limit.show()
五、数据的关联与和并[掌握]
5-1 join关联
- 内关联
- 左关联
- 右关联
from pyspark.sql import SparkSession
ss =SparkSession.builder.getOrCreate()
# 读取文件数据
df1 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='user_id string,username string,sex string,age string,cls string')
df2 = ss.read.csv('hdfs://node1:8020/data/students2.csv',header=False,sep=',',schema='user_id string,username string,sex string,age string,cls string')
# 两表进行关联
# 内关联 第一个参数,关联的df 第二参数 关联字段 第三个参数 关联方式 默认inner
df_join = df1.join(df2,df1.user_id == df2.user_id)
df_join2 = df1.join(df2,'user_id')
# 左关联
df_left= df1.join(df2,'user_id','left')
# 右关联
df_right = df1.join(df2,'user_id','right')
# show查看数据
df1.show()
df2.show()
df_join.show()
df_join2.show()
print('---------------------------------')
df_left.show()
df_right.show()
5-2 Union合并
from pyspark.sql import SparkSession
ss =SparkSession.builder.getOrCreate()
# 读取文件数据
df1 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='user_id string,username string,sex string,age string,cls string')
df2 = ss.read.csv('hdfs://node1:8020/data/students2.csv',header=False,sep=',',schema='user_id string,username string,sex string,age string,cls string')
# 两表进行关联 合并后不会去重
df_union = df1.union(df2)
df_unionAll = df1.unionAll(df2)
# 合并后的数据去重
df_distinct = df_union.distinct().orderBy('user_id')
# 查看数据
df_union.show(100)
print('****************************************************************')
df_unionAll.show(100)
print('****************************************************************')
df_distinct.show(100)
六、缓存和checkpoint[了解]
# df的数据持久化
from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()
# 指定checkpoint的位置
sc = ss.sparkContext # 或sparkcontext对象
sc.setCheckpointDir('hdfs://node1:8020/df_checkpoint')
# 读取文件数据
df1 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='user_id string,username string,sex string,age int,cls string')
# 缓存 通过缓存级别指定缓存位置 默认是内存和磁盘上
# df1.persist()
# 使用checkpoint
df1.checkpoint()
df1_sum = df1.groupby('sex').sum('age')
#查看计算结果
df1_sum.show()
原文地址:https://blog.csdn.net/weixin_58305115/article/details/142894229
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!