(六)Spark大数据开发实战:豆瓣电影数据处理与分析(scala版)
目录
④、求每位演员所有参演电影中的最早、最晚上映时间及其相隔天数、年数
⑤、求每位演员所有电影中的评分最高值、最低值、电影数量、评分均值、标准差、方差、最高最低评分之差值
⑥、求参演大于等于10部电影的每位演员的平均评分,计算规则:去掉一个最高分和一个最低分,然后再计算电影平均分
⑦、求投票数在所有电影中排前80%、评分在所有电影中排前20%的电影信息
⑧、求美国、中国(含港澳台)、英国、法国、俄罗斯5个国家各个电影类型的上映电影数目
⑩、统计从数据中最早年份到最晚年份的每月上映电影数量,若某个月份无电影上映则数量为0
一、Spark
Apache Spark是一个用于大数据处理的开源分布式计算框架,Spark的很多内部组件都是用Scala编写的,Scala的函数式编程特性与Spark的分布式数据结构(如RDD、DataFrame和Dataset)非常契合。Scala也是Spark的主要编程语言之一,Spark API在Scala中得到了原生的支持,使得Scala成为开发Spark应用程序的首选语言。Scala编译后的字节码运行在JVM上,与Java一样,有着良好的性能。
本文软件环境如下:
操作系统:CentOS Linux 7
Hadoop版本:3.1.3,安装教程可见我另一篇博客内容:Linux CentOS安装Hadoop3.1.3(单机版)详细教程
Spark版本:3.5.2,安装教程可见我另一篇博客内容:Linux CentOS安装PySpark3.5(单机版)详细教程及机器学习实战
scala版本:2.12.18
Spark系列文章:
(四)PySpark3:Mlib机器学习实战-信用卡交易数据异常检测
(五)Spark大数据开发实战:豆瓣电影数据处理与分析(python版)
(六)Spark大数据开发实战:豆瓣电影数据处理与分析(scala版)
二、数据介绍
本文数据来自采集豆瓣网分类排行榜 (“https://movie.douban.com/chart”)中各分类类别所有电影的相关信息并存储为csv文件。
爬虫代码在我另一篇博客:豆瓣电影信息爬取与可视化分析
数据放在了百度云上:https://pan.baidu.com/s/1YWB2iEOsMmXHkEUFpY2_TA?pwd=ej3z
数据如下图所示,包含电影名、上映日期、上映地区、类型、豆瓣链接、参演演员、演员数、评分、打分人数,共有3357部电影:
三、Spark大数据开发实战(Scala)
1、数据文件上传HDFS
首先通过xftp上传linux服务器,然后通过以下命令上传至HDFS:
hdfs dfs -mkdir /data
hdfs dfs -mkdir /output
hdfs dfs -put film_info.csv /data
2、导入模块及数据
import spark.implicits._ 导入了SparkSession实例的隐式转换,允许将Scala的集合转换为Spark的DataFrame,对于处理数据时非常有用。
import org.apache.spark.sql.{SparkSession, functions => F}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.UserDefinedFunction
object FilmAnalysis {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val df = spark.read
.option("header", "true") // 告知Spark CSV文件的第一行是列名
.option("inferSchema", "true") // 让Spark自动推断每列的数据类型
.csv("/data/film_info.csv") // 指定要读取的CSV文件的路径
3、数据统计与分析
①、计算演员参演电影数
以下代码中使用了spark sql进行统计,也可以通过DataFrame API进行统计。
val dfSplit = df
.withColumn("actors", F.split(df("actors"), "\\|"))
.withColumn("types", F.split(df("types"), "\\|"))
.withColumn("regions", F.split(df("regions"), "\\|"))
val dfExploded = dfSplit
.withColumn("actor", F.explode(dfSplit("actors")))
dfExploded
.drop("actors", "regions", "types")
.createOrReplaceTempView("actor_exploded")
val df1 = spark.sql(
"""
|select actor, count(*) as act_film_num
|from actor_exploded
|group by actor
""".stripMargin)
.orderBy(F.col("act_film_num").desc)
df1.repartition(1)
.write
.option("header", "true")
.mode("overwrite")
.csv("/output/result1.csv")
结果如下:
+-------------+------------+
| actor|act_film_num|
+-------------+------------+
| 童自荣| 43|
| 户田惠子| 37|
| 林雪| 33|
| 张国荣| 32|
| 刘德华| 31|
| 周星驰| 31|
| 成龙| 31|
| 任达华| 31|
| 刘洵| 30|
|塞缪尔·杰克逊| 29|
| 汤姆·汉克斯| 29|
| 梁家辉| 28|
| 吴孟达| 28|
| 梁朝伟| 27|
| 斯坦·李| 27|
| 吴君如| 27|
| 威廉·达福| 27|
| 黄秋生| 27|
| 胡立成| 27|
| 布拉德·皮特| 26|
+-------------+------------+
only showing top 20 rows
②、依次罗列电影番位前十的演员
这一题考察了窗口函数、行转列等等。
val windowSpec1 = Window.partitionBy("title").orderBy("actors")
val rankNum = 10
val rankNumList = (1 to rankNum).map(_.toString)
val df2 = dfExploded
.withColumn("rank", F.row_number().over(windowSpec1))
val df2Tmp1 = df2
.groupBy("title")
.pivot("rank", rankNumList)
.agg(F.collect_list("actor"))
// 从list提取值,并更改列名
val processedCols = rankNumList.map(i => F.col(s"$i").getItem(0).as(s"actor$i"))
// 筛选需要的列
val df2Tmp2 = df2Tmp1.select(F.col("title") +: processedCols: _*)
df2Tmp2
.repartition(1)
.write
.option("header", "true")
.mode("overwrite")
.csv("/output/result2.csv")
结果如下:
+------------------------+-------------------+---------------------+------------------+---------------+-----------------+----------------------+-------------------+---------------------+-----------------+-----------------+
| title| actor1| actor2| actor3| actor4| actor5| actor6| actor7| actor8| actor9| actor10|
+------------------------+-------------------+---------------------+------------------+---------------+-----------------+----------------------+-------------------+---------------------+-----------------+-----------------+
| 101忠狗| 罗德·泰勒| 凯特·鲍尔| 本·怀特| 丽莎·戴维斯| 贝蒂·洛乌·格尔森| J·帕特·奥马利| 玛莎·温特沃思| 大卫·弗兰克海姆|弗莱德里克·沃洛克| 汤姆·康威|
| 11:14| 亨利·托马斯| 布莱克·赫伦| 芭芭拉·赫希| 克拉克·格雷格| 希拉里·斯万克| 肖恩·海托西| 斯塔克·桑德斯| 科林·汉克斯| 本·福斯特| 帕特里克·斯威兹|
| 2012| 约翰·库萨克| 阿曼达·皮特| 切瓦特·埃加福| 坦迪·牛顿| 奥利弗·普莱特| 汤姆·麦卡锡| 伍迪·哈里森| 丹尼·格洛弗| 连姆·詹姆斯| 摩根·莉莉|
| 2046| 梁朝伟| 章子怡| 王菲| 木村拓哉| 巩俐| 刘嘉玲| 张震| 张曼玉| 董洁| 通猜·麦金泰|
| 21克| 西恩·潘| 娜奥米·沃茨|本尼西奥·德尔·托罗| 夏洛特·甘斯布| 梅丽莎·里奥| 迈克尔·芬内尔| Jessica Scott| 戴维·查特姆| Chance Romero| Carly Nahon|
| BABY BLUE| 柳乐优弥| 菊地凛子| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL|
| Hello!树先生| 王宝强| 谭卓| 何洁| 白培将| 王大治| 王亚彬| 李京怡| 邱士鉴| NULL| NULL|
| Soho区惊魂夜| 托马辛·麦肯齐| 安雅·泰勒-乔伊| 马特·史密斯| 黛安娜·里格| 丽塔·塔欣厄姆| 迈克尔·阿乔| 特伦斯·斯坦普| 山姆·克拉弗林| 科林·梅斯| 西诺薇·卡尔森|
|一个叫欧维的男人决定去死| 罗夫·拉斯加德| 巴哈·帕斯| 托比亚斯·阿姆博瑞| 菲利普·伯格| 安娜-莱娜·布伦丁| 博瑞·伦贝里| 埃达·英格薇| 弗雷德里克·埃弗斯| 玛德琳·雅各布松| 查特里娜·拉松|
| 一九四四| 卡斯帕·威尔伯格|克里斯蒂安·乌克斯库拉| 麦肯·施密特| 格特·劳塞|亨德里克·图姆佩勒|卡尔·安德烈亚斯·卡梅特| 亨里克·卡尔梅特| 帕努·欧加| 普瑞特·斯川伯格| 普瑞特·派厄斯|
| 一分钟时间机器| 布莱恩·迪岑| 埃瑞恩·海耶斯| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL|
| 一声叹息| 张国立| 刘蓓| 徐帆| 傅彪| 李诚儒| 吴旭| 修宗迪| 高明| 徐秀林| 李文玲|
| 一条狗的使命| 乔什·加德| 布丽特·罗伯森| 丹尼斯·奎德| 佩吉·利普顿| K·J·阿帕| 布莱斯·吉扎尔| 朱丽叶·赖伦斯| 卢克·柯比| 加布里埃尔·罗斯|迈克尔·博夫舍维尔|
| 一袋弹子| 多里安·勒·克利奇| 巴蒂斯特·弗勒里埃尔| 帕特里克·布鲁尔|艾尔莎·泽贝斯坦| 贝尔纳·康庞| 凯文·亚当斯|克里斯蒂昂·克拉维埃| 凯撒·东布瓦| 伊连·贝加拉| 埃米尔·贝宁|
| 丁丁历险记| 杰米·贝尔| 安迪·瑟金斯| 丹尼尔·克雷格| 西蒙·佩吉| 尼克·弗罗斯特| 加利·艾尔维斯| 托比·琼斯| 彼得·杰克逊| 加德·艾尔马莱| 塞巴斯蒂安·罗奇|
| 三个发明家| 米歇尔·欧斯洛| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL|
| 三傻大闹宝莱坞| 阿米尔·汗| 卡琳娜·卡普尔| 马达范| 沙尔曼·乔希| 奥米·瓦依达| 博曼·伊拉尼| 莫娜·辛格|拉杰夫·拉宾德拉纳特安| Atul Tiwari| 阿里·法扎勒|
| 三十九级台阶| 罗伯特·多纳特| 玛德琳·卡洛| 露西·曼海姆| 戈弗雷·特尔|佩吉·阿什克罗福特| 约翰·劳里| 海伦哈耶| 怀利·沃森| 格斯·麦克诺顿| 查尔斯·贝内特|
| 三十二| 韦绍兰| 罗善学| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL|
| 三块广告牌|弗兰西斯·麦克多蒙德| 伍迪·哈里森| 山姆·洛克威尔| 艾比·考尼什| 卢卡斯·赫奇斯| 彼特·丁拉基| 约翰·浩克斯| 卡莱伯·兰德里·琼斯| 凯瑟琳·纽顿| 凯瑞·康顿|
+------------------------+-------------------+---------------------+------------------+---------------+-----------------+----------------------+-------------------+---------------------+-----------------+-----------------+
only showing top 20 rows
原文地址:https://blog.csdn.net/weixin_44458771/article/details/143701015
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!