自学内容网 自学内容网

SparkSQL函数综合实践

文章目录

  • 1. 实战概述
  • 2. 实战步骤
    • 2.1 创建项目
    • 2.2 添加依赖
    • 2.3 设置源目录
    • 2.4 创建日志属性文件
    • 2.5 创建hive配置文件
    • 2.6 创建数据分析对象
      • 2.6.1 导入相关类
      • 2.6.2 创建获取Spark会话方法
      • 2.6.3 创建表方法
      • 2.6.4 准备数据文件
      • 2.6.5 创建加载数据方法
      • 2.6.6 创建薪水排行榜方法
      • 2.6.7 创建主方法
      • 2.6.8 查看完整代码
    • 2.7 启动metastore服务
    • 2.8 运行程序,查看结果
    • 2.8 在Spark Shell里运行程序
  • 3. 实战小结

1. 实战概述

  • 通过使用 Spark 和 Hive 进行数据分析,展示了从项目创建、依赖配置、数据加载到查询分析的完整流程。通过创建 Hive 表、加载 JSON 数据并使用 Spark SQL 查询每个城市工资最高的前 N 名员工,实现了数据的高效处理与分析。实战涵盖了 SparkSession 初始化、Hive 表操作、数据加载及窗口函数的使用,适用于大数据处理场景。

2. 实战步骤

2.1 创建项目

  • 设置项目基本信息
    在这里插入图片描述
  • 单击【Create】按钮,生成项目基本骨架
    在这里插入图片描述
  • java目录改成scala目录
    在这里插入图片描述

2.2 添加依赖

  • pom.xml文件里添加相关依赖
    在这里插入图片描述
  • 刷新项目依赖
    在这里插入图片描述

2.3 设置源目录

  • pom.xml里设置源目录
    在这里插入图片描述

2.4 创建日志属性文件

  • resources里创建log4j2.properties文件
    在这里插入图片描述
rootLogger.level = ERROR
rootLogger.appenderRef.stdout.ref = console

appender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

2.5 创建hive配置文件

  • resources里创建hive-site.xml文件
    在这里插入图片描述
  • bigdata1云主机上执行命令:$HIVE_HOME/conf/hive-site.xml,拷贝其内容到resources里的hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://bigdata1:3306/metastore?useSSL=false</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>123456</value>
    </property>
    <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/user/hive/warehouse</value>
    </property>
    <property>
        <name>hive.server2.thrift.port</name>
        <value>10000</value>
    </property>
    <property>
        <name>hive.server2.thrift.bind.host</name>
        <value>bigdata1</value>
    </property>
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://bigdata1:9083</value>
    </property>
    <property>
        <name>hive.metastore.event.db.notification.api.auth</name>
        <value>false</value>
    </property>
    <property>
        <name>hive.metastore.schema.verification</name>
        <value>false</value>
    </property>
    <property>
        <name>hive.server2.active.passive.ha.enable</name>
        <value>true</value>
    </property>
</configuration>

2.6 创建数据分析对象

  • 添加scala-sdk到项目
    在这里插入图片描述

  • 单击【Add to Modules…】菜单项
    在这里插入图片描述

  • 单击【OK】按钮即可

  • 创建net.huawei.sql
    在这里插入图片描述

  • net.huawei.sql包里创建DataAnalysis对象
    在这里插入图片描述

2.6.1 导入相关类

  • 导入三个类:SparkConfSparkSessionDataFrame
    在这里插入图片描述

2.6.2 创建获取Spark会话方法

  • 创建getSparkSession()方法
    在这里插入图片描述
// 获取SparkSession对象                                      
def getSparkSession(): SparkSession = {                  
  // 创建SparkConf对象                                       
  val conf = new SparkConf()                             
  conf.setMaster("local[*]")                             
  conf.setAppName("DataAnalysis")                        
  conf.set("dfs.client.use.datanode.hostname", "true")   
                                                         
  // 创建SparkSession对象                                    
  SparkSession.builder()                                 
    .config(conf)                                        
    .enableHiveSupport()                                 
    .getOrCreate()                                       
}                                                                           

2.6.3 创建表方法

  • 创建createTable()方法
    在这里插入图片描述
// 创建表                                                   
def createTable(spark: SparkSession): Unit = {           
  spark.sql(                                             
    s"""                                                 
       |CREATE TABLE IF NOT EXISTS salary_info           
       |  (city string, name string, salary double)      
       |  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','  
       |""".stripMargin                                  
  )                                                      
}                                                        

2.6.4 准备数据文件

  • 在项目根目录创建data目录,在里面创建salary.json文件
    在这里插入图片描述
{"city": "北京", "name": "陈燕文", "salary": 5000.0}
{"city": "上海", "name": "李伟强", "salary": 8000.0}
{"city": "广州", "name": "王丽娜", "salary": 5500.0}
{"city": "北京", "name": "赵建国", "salary": 5200.0}
{"city": "上海", "name": "孙志强", "salary": 5300.0}
{"city": "广州", "name": "方云龙", "salary": 6800.0}
{"city": "北京", "name": "周晓峰", "salary": 6400.0}
{"city": "上海", "name": "吴雅婷", "salary": 5100.0}
{"city": "广州", "name": "郑文杰", "salary": 5600.0}
{"city": "上海", "name": "王海涛", "salary": 7500.0}
{"city": "北京", "name": "李雪梅", "salary": 5800.0}
{"city": "广州", "name": "童玉明", "salary": 7800.0}

2.6.5 创建加载数据方法

  • 创建loadData()方法
    在这里插入图片描述
// 加载数据                                                                          
def loadData(spark: SparkSession, inputPath: String, tableName: String): Unit = {
  val fileDF: DataFrame = spark.read.format("json").load(inputPath)              
  fileDF.write.insertInto(tableName)                                             
}                                                                                

2.6.6 创建薪水排行榜方法

  • 创建salaryTopN()方法
    在这里插入图片描述
// 查询工资topN                                                                           
def salaryTopN(spark: SparkSession, topN: Int): Unit = {                              
  spark.sql(                                                                          
    s"""                                                                              
       |SELECT                                                                        
       |  city, name, salary                                                          
       |FROM                                                                          
       |  (                                                                           
       |    SELECT                                                                    
       |      city, name, salary,                                                     
       |      row_number() OVER (PARTITION BY city ORDER BY salary DESC) AS row_num   
       |    FROM                                                                      
       |      salary_info                                                             
       |  ) salary_rank                                                               
       |WHERE row_num <= $topN                                                        
       |""".stripMargin                                                               
  ).show()                                                                            
}                                                                                     
  • 代码说明salaryTopN 方法用于查询每个城市工资最高的前 topN 名员工。通过 row_number() 窗口函数按城市分组并按工资降序排序,生成行号 row_num,然后筛选出行号小于等于 topN 的记录。最终结果展示每个城市工资最高的前 topN 名员工的姓名和工资。

2.6.7 创建主方法

  • 通过 getSparkSession() 获取 SparkSession 实例,使用 createTable() 在 Hive 中创建表,调用 loadData() 加载数据并写入 Hive 表,通过 salaryTopN() 查询每个城市工资最高的前 N 名员工信息,最后释放资源。
    在这里插入图片描述
// 主方法                                                   
def main(args: Array[String]): Unit = {                  
  // 获取SparkSession对象                                    
  val spark = getSparkSession()                          
  // 创建表                                                 
  createTable(spark)                                     
  // 加载数据                                                
  loadData(spark, "data/salary.json", "salary_info")     
  // 查询工资top3                                            
  salaryTopN(spark, 3)                                   
}                                                        

2.6.8 查看完整代码

package net.huawei.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 功能:数据分析对象
 * 作者:华卫
 * 日期:2025年01月21日
 */
object DataAnalysis {
  // 获取SparkSession对象
  def getSparkSession(): SparkSession = {
    // 创建SparkConf对象
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("DataAnalysis")
    conf.set("dfs.client.use.datanode.hostname", "true")

    // 创建SparkSession对象
    SparkSession.builder()
      .config(conf)
      .enableHiveSupport()
      .getOrCreate()
  }

  // 创建表
  def createTable(spark: SparkSession): Unit = {
    spark.sql(
      s"""
         |CREATE TABLE IF NOT EXISTS salary_info
         |  (city string, name string, salary double)
         |  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
         |""".stripMargin
    )
  }

  // 加载数据
  def loadData(spark: SparkSession, inputPath: String, tableName: String): Unit = {
    val fileDF: DataFrame = spark.read.format("json").load(inputPath)
    fileDF.write.insertInto(tableName)
  }

  // 查询工资topN
  def salaryTopN(spark: SparkSession, topN: Int): Unit = {
    spark.sql(
      s"""
         |SELECT
         |  city, name, salary
         |FROM
         |  (
         |    SELECT
         |      city, name, salary,
         |      row_number() OVER (PARTITION BY city ORDER BY salary DESC) AS row_num
         |    FROM
         |      salary_info
         |  ) salary_rank
         |WHERE row_num <= $topN
         |""".stripMargin
    ).show()
  }

  // 主方法
  def main(args: Array[String]): Unit = {
    // 获取SparkSession对象
    val spark = getSparkSession()
    // 创建表
    createTable(spark)
    // 加载数据
    loadData(spark, "data/salary.json", "salary_info")
    // 查询工资top3
    salaryTopN(spark, 3)
  }
}

2.7 启动metastore服务

  • 执行命令:hive --service metastore &
    在这里插入图片描述

2.8 运行程序,查看结果

  • 运行DataAnalysis对象
    在这里插入图片描述
  • hive客户端,查看创建的c
    在这里插入图片描述
  • 查看salary_info表的内容
    在这里插入图片描述
  • 在HDFS上查看salary_info表对应的目录
    在这里插入图片描述
  • 下载文件,查看内容
    在这里插入图片描述

2.8 在Spark Shell里运行程序

  • salary.json上传到HDFS的/data目录
    在这里插入图片描述

  • 在spark shell里执行命令::paste,粘贴代码
    在这里插入图片描述

  • Ctrl + D,查看结果
    在这里插入图片描述

3. 实战小结

  • 本次实战通过使用 Spark 和 Hive 进行数据分析,展示了从项目创建、依赖配置、数据加载到查询分析的完整流程。首先,我们创建了 Hive 表并加载了 JSON 数据,随后通过 Spark SQL 查询每个城市工资最高的前 N 名员工。实战中,我们使用了 SparkSession 初始化、Hive 表操作、数据加载及窗口函数等技术,实现了数据的高效处理与分析。通过本次实战,我们掌握了 Spark 和 Hive 的基本操作,并学会了如何在大数据场景下进行数据分析和处理。

原文地址:https://blog.csdn.net/howard2005/article/details/145286487

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!