【大数据技术基础 | 实验九】Flume实验:文件数据Flume至HDFS
文章目录
一、实验目的
- 掌握Flume的安装部署;
- 掌握一个agent中source、sink、channel组件之间的关系;
- 加深对Flume结构和概念的理解;
- 掌握Flume的编码方法及启动任务方法。
二、实验要求
- 在一台机器上(本例以slave1为例)部署Flume;
- 实时收集本地hadoop的日志的最新信息然后将收集到日志信息以一分钟一个文件的形式写入HDFS目录中。
三、实验原理
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力 Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力。
当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。
Flume-og采用了多Master的方式。为了保证配置数据的一致性,Flume引入了ZooKeeper,用于保存配置数据,ZooKeeper本身可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper可以通知Flume Master节点。Flume Master间使用gossip协议同步数据。
Flume-ng最明显的改动就是取消了集中管理配置的 Master 和 Zookeeper,变为一个纯粹的传输工具。Flume-ng另一个主要的不同点是读入数据和写出数据现在由不同的工作线程处理(称为 Runner)。 在 Flume-og 中,读入线程同样做写出工作(除了故障重试)。如果写出慢的话(不是完全失败),它将阻塞 Flume 接收数据的能力。这种异步的设计使读入线程可以顺畅的工作而无需关注下游的任何问题。
Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成,如图所示。
值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个agent可以协同工作,并且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,这也正是NB之处。如图所示:
(一)flume的特点
flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。
flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。
(二)flume的可靠性
当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Besteffort(数据发送到接收方后,不会进行确认)。
四、实验环境
- 云创大数据实验平台:
- Java 版本:jdk1.7.0_79
- Hadoop 版本:hadoop-2.7.1
- Flume 版本:flume-1.5.2
五、实验内容和步骤
本实验主要演示Flume安装,及启动一个Flume收集日志信息的例子,实验主要包含以下三个大步骤。
(一)启动Hadoop集群
首先,启动Hadoop集群。这里可以使用一键搭建,将Hadoop集群搭建完成。
具体详细步骤可参考:【大数据技术基础 | 实验三】HDFS实验:部署HDFS
使用jps
查看Java进程:
(二)安装并配置Flume
其次,(剩下的所有步骤只需要在master上操作就可以了)安装并配置Flume任务,内容如下:
将Flume 安装包解压到/usr/cstor
目录,并将flume目录所属用户改成root:root。
tar -zxvf flume-1.5.2.tar.gz -c /usr/cstor
chown -R root:root /usr/cstor/flume
以上安装步骤大数据实验平台已经帮我们解压好了,所以就不用输这个命令了,我们使用查看命令:
cd /usr/cstor/flume
ls
进入解压目录下,在conf
目录下新建test.conf
文件并添加以下配置内容:
cd /usr/cstor/flume/conf
vim test.conf
#定义agent中各组件名称
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1
# source1组件的配置参数
agent1.sources.source1.type=exec
#此处的文件/home/source.log需要手动生成,见后续说明
agent1.sources.source1.command=tail -n +0 -F /home/source.log
# channel1的配置参数
agent1.channels.channel1.type=memory
agent1.channels.channel1.capacity=1000
agent1.channels.channel1.transactionCapactiy=100
# sink1的配置参数
agent1.sinks.sink1.type=hdfs
agent1.sinks.sink1.hdfs.path=hdfs://master:8020/flume/data
agent1.sinks.sink1.hdfs.fileType=DataStream
#时间类型
agent1.sinks.sink1.hdfs.useLocalTimeStamp=true
agent1.sinks.sink1.hdfs.writeFormat=TEXT
#文件前缀
agent1.sinks.sink1.hdfs.filePrefix=%Y-%m-%d-%H-%M
#60秒滚动生成一个文件
agent1.sinks.sink1.hdfs.rollInterval=60
#HDFS块副本数
agent1.sinks.sink1.hdfs.minBlockReplicas=1
#不根据文件大小滚动文件
agent1.sinks.sink1.hdfs.rollSize=0
#不根据消息条数滚动文件
agent1.sinks.sink1.hdfs.rollCount=0
#不根据多长时间未收到消息滚动文件
agent1.sinks.sink1.hdfs.idleTimeout=0
# 将source和sink绑定到channel
agent1.sources.source1.channels=channel1
agent1.sinks.sink1.channel=channel1
(三)启动Flume并上传文件数据到HDFS
然后,在HDFS上创建/flume/data
目录:
cd /usr/cstor/hadoop/bin
./hdfs dfs -mkdir /flume
./hdfs dfs -mkdir /flume/data
最后,进入Flume安装的bin
目录下:
cd /usr/cstor/flume/bin
启动Flume,开始收集日志信息:
./flume-ng agent --conf conf --conf-file /usr/cstor/flume/conf/test.conf --name agent1 -Dflume.root.logger=DEBUG,console
看到如下结果就表示启动成功:
接下来我们再创建一个master节点,然后我们去手动生成消息源,也就是配置文件中的/home/source.log
,使用如下命令去不断的写入信息到该文件中:
vim /home/source.log
然后不断重复如下命令写入信息:
echo aa >> /home/source.log
echo aa >> /home/source.log
echo aa >> /home/source.log
echo aa >> /home/source.log
echo aa >> /home/source.log
echo aa >> /home/source.log
最后我们查看这个文件的内容:
cat /home/source.log
可以看见我们写入信息成功。
六、实验结果
我们进入到hadoop的bin
目录下面,然后查看我们创建的文件中是否有我们传入的文件:
cd /usr/cstor/hadoop/bin
hadoop fs -ls /flume/data
查看HDFS中的文件内容:
hadoop fs -cat /flume/data/2024-10-31-08-57.1730336220648
与本地文件/home/source.log
内容一致,证明上传成功。
七、实验心得
在本次Flume实验中,我深入了解了Apache Flume的工作原理和应用场景,加深了对分布式数据采集系统的理解。Flume的架构由Source、Channel和Sink组成,这种分层设计使得数据的收集、暂存和传输更加高效和灵活。
实验中,我首先配置了File Source,成功从本地日志文件中收集数据。通过这一过程,我体会到Source的配置和数据流入的效率。在选择Channel时,我决定使用Memory Channel,因为它在内存中的处理速度较快,适合实时数据传输。同时,我也意识到不同类型的Channel在数据安全性和性能之间的权衡。随后,我将数据发送到HDFS,通过HDFS Sink的配置,我观察到数据以分片的形式存储,这对于后续的数据分析尤为重要。这种分片机制不仅提高了数据的可管理性,还便于后续的并行处理。这一过程让我感受到Flume在大数据处理中的强大能力和灵活性。
此外,我还学习了如何通过配置文件进行多组件的组合与调度。Flume允许用户在不修改代码的情况下,通过简单的配置调整数据流的来源和去向,这种灵活性极大地提升了系统的可维护性。在实验的最后,我利用Flume的日志功能进行监控,及时发现并解决了在数据流转过程中出现的问题。这种实时监控机制不仅保证了数据的完整性,也为在生产环境中的应用提供了保障。
总的来说,这次Flume实验让我更加全面地理解了分布式数据采集的流程及其在大数据生态系统中的重要性。我期待在今后的学习和实践中,将这些知识应用于实际项目中,以提升我的数据处理能力。
附:以上文中的数据文件及相关资源下载地址:
链接:https://pan.quark.cn/s/f019956fb520
提取码:phDe
原文地址:https://blog.csdn.net/Morse_Chen/article/details/143379485
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!