自学内容网 自学内容网

大数据-235 离线数仓 - 会员活跃度 数据测试 完整加载 ODS DWD 层

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 异构数据源 DataX
  • 数据从 HDFS 到 MySQL

在这里插入图片描述

数据测试

数据采集 => ODS => DWD => DWS => ADS => MySQL
活跃会员、新增会员、会员留存

  • DAU:Daily Active User(日活跃用户)
  • MAU:Monthly Active User(月活跃用户)
  • 假设App的DAU在1000W左右,日启动数据大概1000W条

查看脚本

脚本调用次序:(名字可能有几个不完全能对上,如果名字不对,也不会差太多,修改一下成下面对应的,方便后续的执行)

# 加载ODS / DWD 层采集
ods_load_startlog.sh
dwd_load_startlog.sh
# 活跃会员
dws_load_member_start.sh
ads_load_member_active.sh
# 新增会员
dws_load_member_add_day.sh
ads_load_member_add.sh
# 会员留存
dws_load_member_retention_day.sh
ads_load_member_retention.sh

我们之前编写的好的脚本如下所示:
在这里插入图片描述

清理数据

将之前测试的数据清理掉,主要是HDFS上数据,避免对后续的操作产生错误的影响:

hadoop fs -rm -r /user/data/logs/start/
hadoop fs -rm -r /user/data/logs/enent/

hadoop fs -mkdir /user/data/logs/start
hadoop fs -mkdir /user/data/logs/event

加载数据

我们需要启动Flume,将数据写入,这里准备了几个 log 文件,4MB大小的数据文件。我们将模拟这个文件是程序产生的日志文件,然后Flume采集日志文件后,写入到HDFS中,重新回顾我们的配置文件(flume-log2hdfs2.conf):

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/wzk/conf/startlog_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/wzk/logs/start/.*log
a1.sources.r1.headers.f1.logtype = start
a1.sources.r1.filegroups.f2 = /opt/wzk/logs/event/.*log
a1.sources.r1.headers.f2.logtype = event
# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.LogTypeInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream
# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 监听的日志目录是:/opt/wzk/logs/start/.*log,一会儿我们将把文件放到这个目录下
  • 写入的HDFS目录是:/user/data/logs/start 下

我们先启动Flume,然后将对应的日志放入到监听的目录下。

flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs2.conf -name a1 -Dflume.roog.logger=INFO,console

我的数据如下图所示:

2021-09-16 16:55:01.744 [main] INFO  icu.wzk.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"2","action":"0","error_code":"0"},"time":1595260800000},"attr":{"area":"洛阳","uid":"2F1009

对应的就截图如下所示:
在这里插入图片描述
把这个数据移动到监控目录下:

cp /opt/wzk/hive/data/start0721.log /opt/wzk/logs/start/start0721.log

我们观察到flume的程序日志如下:

24/08/31 01:59:19 INFO taildir.ReliableTaildirEventReader: headerTable: {}
24/08/31 01:59:19 INFO taildir.ReliableTaildirEventReader: Updating position from position file: /opt/wzk/conf...

对应的截图如下所示:
在这里插入图片描述
HDFS状态如下:
在这里插入图片描述

ODS

定义

ODS 是操作型数据存储层,主要用于存放从业务系统中抽取的原始数据。
数据通常以 业务系统的原始格式 或经过少量标准化处理的形式存储。
是数仓的“数据输入口”,负责承接来自业务系统的数据。

功能

数据完整存储:保持数据原始性,便于追溯问题。
解耦:实现业务系统与数据仓库之间的隔离。
实时性支持:如果数据需要实时分析,ODS 可以支持数据的近实时同步。

数据处理

轻度清洗:

  • 去除重复数据。
  • 标准化时间字段、日期字段格式。
  • 增加一些必要的标识字段(如抽取时间、源系统等)。

轻量变换:

  • 数据结构与源系统保持一致,尽量避免复杂的转换。

存储特点

  • 全量存储:ODS 通常存储的是全量数据(包括历史快照)。
  • 分区设计:按时间分区(如按天、小时),以便数据管理和性能优化。
  • 高吞吐:支持高并发写入和抽取。

加载ODS

sh ods_load_startlog.sh 2020-07-21

执行结果如小图所示:
在这里插入图片描述
我们查看Hive中的数据:

hive
-- 执行SQL
use ods;
select * from ods_start_log limit 3;

这里可以看到,外部表已经把数据挂载上了:
在这里插入图片描述

加载ODS其他数据

由于数据很多,这里重复上述的日期步骤,把其他数据加载进来,总共的数据如下图所示:
在这里插入图片描述
对应的Flume的结果:
在这里插入图片描述
对应的HDFS的结果:
在这里插入图片描述

DWD

定义

DWD 是明细数据层,存储的是经过清洗和轻度处理的宽表数据。
DWD 数据是细粒度的、面向分析的明细数据,数据通常已经具备一定的 业务逻辑处理 和 质量保证。

功能

数据细化:清洗、标准化后的数据,供后续主题建模使用。
性能优化:减少后续查询时的数据清洗和转换开销。
维度宽表化:对多表进行宽表关联,便于业务分析。

数据处理

数据清洗:

  • 去除脏数据,如无效值、空值处理。
  • 格式统一,如日期格式、金额精度。

数据整合:

  • 按照业务逻辑关联多张表(如订单表和用户表)。
  • 增加计算字段或衍生字段(如消费总金额)。

去重与规范化:

  • 去重是 DWD 层的常见操作,保证数据唯一性。
  • 统一编码、统一时间维度等。

存储特点

  • 宽表存储:尽可能将相关的明细数据组织成宽表,减少关联操作。
  • 分区设计:根据业务需求设计分区,通常也以时间为主要分区键。

加载DWD

sh dwd_load_startlog.sh 2020-07-21
# 后续加载了 21-31的数据 重复性高就略过了

执行之后的结果如下图所示:
在这里插入图片描述
数据将会写入到:dwd.dwd_start_log
我们查看数据的情况:

hive 
-- 查看数据
use dwd;
select * from dwd.dwd_start_log limit 3;

可以看到数据已经加载出来了:

1FB872-9A1001   连云港  2F10092A1       1.1.8   common  0.43    PN      chinese iphone

对应的截图如下所示:
在这里插入图片描述

加载DWD其他数据

由于数据很多,这里就都加载了,都是重复步骤,就省略了。
在这里插入图片描述


原文地址:https://blog.csdn.net/w776341482/article/details/144074253

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!