SparkSQL函数综合实践
文章目录
- 1. 实战概述
- 2. 实战步骤
- 2.1 创建项目
- 2.2 添加依赖
- 2.3 设置源目录
- 2.4 创建日志属性文件
- 2.5 创建hive配置文件
- 2.6 创建数据分析对象
- 2.6.1 导入相关类
- 2.6.2 创建获取Spark会话方法
- 2.6.3 创建表方法
- 2.6.4 准备数据文件
- 2.6.5 创建加载数据方法
- 2.6.6 创建薪水排行榜方法
- 2.6.7 创建主方法
- 2.6.8 查看完整代码
- 2.7 启动metastore服务
- 2.8 运行程序,查看结果
- 2.8 在Spark Shell里运行程序
- 3. 实战小结
1. 实战概述
- 通过使用 Spark 和 Hive 进行数据分析,展示了从项目创建、依赖配置、数据加载到查询分析的完整流程。通过创建 Hive 表、加载 JSON 数据并使用 Spark SQL 查询每个城市工资最高的前 N 名员工,实现了数据的高效处理与分析。实战涵盖了 SparkSession 初始化、Hive 表操作、数据加载及窗口函数的使用,适用于大数据处理场景。
2. 实战步骤
2.1 创建项目
- 设置项目基本信息
- 单击【Create】按钮,生成项目基本骨架
- 将
java
目录改成scala
目录
2.2 添加依赖
- 在
pom.xml
文件里添加相关依赖
- 刷新项目依赖
2.3 设置源目录
- 在
pom.xml
里设置源目录
2.4 创建日志属性文件
- 在
resources
里创建log4j2.properties
文件
rootLogger.level = ERROR
rootLogger.appenderRef.stdout.ref = console
appender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex
2.5 创建hive配置文件
- 在
resources
里创建hive-site.xml
文件
- 在
bigdata1
云主机上执行命令:$HIVE_HOME/conf/hive-site.xml
,拷贝其内容到resources
里的hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://bigdata1:3306/metastore?useSSL=false</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>bigdata1</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://bigdata1:9083</value>
</property>
<property>
<name>hive.metastore.event.db.notification.api.auth</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>hive.server2.active.passive.ha.enable</name>
<value>true</value>
</property>
</configuration>
2.6 创建数据分析对象
-
添加
scala-sdk
到项目
-
单击【Add to Modules…】菜单项
-
单击【OK】按钮即可
-
创建
net.huawei.sql
包
-
在
net.huawei.sql
包里创建DataAnalysis
对象
2.6.1 导入相关类
- 导入三个类:
SparkConf
、SparkSession
、DataFrame
2.6.2 创建获取Spark会话方法
- 创建
getSparkSession()
方法
// 获取SparkSession对象
def getSparkSession(): SparkSession = {
// 创建SparkConf对象
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("DataAnalysis")
conf.set("dfs.client.use.datanode.hostname", "true")
// 创建SparkSession对象
SparkSession.builder()
.config(conf)
.enableHiveSupport()
.getOrCreate()
}
2.6.3 创建表方法
- 创建
createTable()
方法
// 创建表
def createTable(spark: SparkSession): Unit = {
spark.sql(
s"""
|CREATE TABLE IF NOT EXISTS salary_info
| (city string, name string, salary double)
| ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
|""".stripMargin
)
}
2.6.4 准备数据文件
- 在项目根目录创建
data
目录,在里面创建salary.json
文件
{"city": "北京", "name": "陈燕文", "salary": 5000.0}
{"city": "上海", "name": "李伟强", "salary": 8000.0}
{"city": "广州", "name": "王丽娜", "salary": 5500.0}
{"city": "北京", "name": "赵建国", "salary": 5200.0}
{"city": "上海", "name": "孙志强", "salary": 5300.0}
{"city": "广州", "name": "方云龙", "salary": 6800.0}
{"city": "北京", "name": "周晓峰", "salary": 6400.0}
{"city": "上海", "name": "吴雅婷", "salary": 5100.0}
{"city": "广州", "name": "郑文杰", "salary": 5600.0}
{"city": "上海", "name": "王海涛", "salary": 7500.0}
{"city": "北京", "name": "李雪梅", "salary": 5800.0}
{"city": "广州", "name": "童玉明", "salary": 7800.0}
2.6.5 创建加载数据方法
- 创建
loadData()
方法
// 加载数据
def loadData(spark: SparkSession, inputPath: String, tableName: String): Unit = {
val fileDF: DataFrame = spark.read.format("json").load(inputPath)
fileDF.write.insertInto(tableName)
}
2.6.6 创建薪水排行榜方法
- 创建
salaryTopN()
方法
// 查询工资topN
def salaryTopN(spark: SparkSession, topN: Int): Unit = {
spark.sql(
s"""
|SELECT
| city, name, salary
|FROM
| (
| SELECT
| city, name, salary,
| row_number() OVER (PARTITION BY city ORDER BY salary DESC) AS row_num
| FROM
| salary_info
| ) salary_rank
|WHERE row_num <= $topN
|""".stripMargin
).show()
}
- 代码说明:
salaryTopN
方法用于查询每个城市工资最高的前topN
名员工。通过row_number()
窗口函数按城市分组并按工资降序排序,生成行号row_num
,然后筛选出行号小于等于topN
的记录。最终结果展示每个城市工资最高的前topN
名员工的姓名和工资。
2.6.7 创建主方法
- 通过
getSparkSession()
获取 SparkSession 实例,使用createTable()
在 Hive 中创建表,调用loadData()
加载数据并写入 Hive 表,通过salaryTopN()
查询每个城市工资最高的前 N 名员工信息,最后释放资源。
// 主方法
def main(args: Array[String]): Unit = {
// 获取SparkSession对象
val spark = getSparkSession()
// 创建表
createTable(spark)
// 加载数据
loadData(spark, "data/salary.json", "salary_info")
// 查询工资top3
salaryTopN(spark, 3)
}
2.6.8 查看完整代码
package net.huawei.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 功能:数据分析对象
* 作者:华卫
* 日期:2025年01月21日
*/
object DataAnalysis {
// 获取SparkSession对象
def getSparkSession(): SparkSession = {
// 创建SparkConf对象
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("DataAnalysis")
conf.set("dfs.client.use.datanode.hostname", "true")
// 创建SparkSession对象
SparkSession.builder()
.config(conf)
.enableHiveSupport()
.getOrCreate()
}
// 创建表
def createTable(spark: SparkSession): Unit = {
spark.sql(
s"""
|CREATE TABLE IF NOT EXISTS salary_info
| (city string, name string, salary double)
| ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
|""".stripMargin
)
}
// 加载数据
def loadData(spark: SparkSession, inputPath: String, tableName: String): Unit = {
val fileDF: DataFrame = spark.read.format("json").load(inputPath)
fileDF.write.insertInto(tableName)
}
// 查询工资topN
def salaryTopN(spark: SparkSession, topN: Int): Unit = {
spark.sql(
s"""
|SELECT
| city, name, salary
|FROM
| (
| SELECT
| city, name, salary,
| row_number() OVER (PARTITION BY city ORDER BY salary DESC) AS row_num
| FROM
| salary_info
| ) salary_rank
|WHERE row_num <= $topN
|""".stripMargin
).show()
}
// 主方法
def main(args: Array[String]): Unit = {
// 获取SparkSession对象
val spark = getSparkSession()
// 创建表
createTable(spark)
// 加载数据
loadData(spark, "data/salary.json", "salary_info")
// 查询工资top3
salaryTopN(spark, 3)
}
}
2.7 启动metastore服务
- 执行命令:
hive --service metastore &
2.8 运行程序,查看结果
- 运行
DataAnalysis
对象
- 在
hive
客户端,查看创建的c
- 查看
salary_info
表的内容
- 在HDFS上查看
salary_info
表对应的目录
- 下载文件,查看内容
2.8 在Spark Shell里运行程序
-
将
salary.json
上传到HDFS的/data
目录
-
在spark shell里执行命令:
:paste
,粘贴代码
-
按
Ctrl + D
,查看结果
3. 实战小结
- 本次实战通过使用 Spark 和 Hive 进行数据分析,展示了从项目创建、依赖配置、数据加载到查询分析的完整流程。首先,我们创建了 Hive 表并加载了 JSON 数据,随后通过 Spark SQL 查询每个城市工资最高的前 N 名员工。实战中,我们使用了 SparkSession 初始化、Hive 表操作、数据加载及窗口函数等技术,实现了数据的高效处理与分析。通过本次实战,我们掌握了 Spark 和 Hive 的基本操作,并学会了如何在大数据场景下进行数据分析和处理。
原文地址:https://blog.csdn.net/howard2005/article/details/145286487
免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!