kafka-clients之生产者发送流程
背景:kafka-clients-3.0.1.jar
重要配置
- org.apache.kafka.clients.producer.ProducerConfig#MAX_BLOCK_MS_CONFIG
- org.apache.kafka.clients.producer.ProducerConfig#BUFFER_MEMORY_CONFIG
- org.apache.kafka.clients.producer.ProducerConfig#DELIVERY_TIMEOUT_MS_CONFIG
- org.apache.kafka.clients.CommonClientConfigs#RETRIES_CONFIG
- org.apache.kafka.clients.CommonClientConfigs#RETRY_BACKOFF_MS_CONFIG
- org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
- org.apache.kafka.clients.producer.ProducerConfig#ACKS_CONFIG
- org.apache.kafka.clients.producer.ProducerConfig#BATCH_SIZE_CONFIG
配置介绍
这些配置是Kafka官方客户端用于生产者配置的重要参数,下面详细介绍它们的作用和意义:
**MAX_BLOCK_MS_CONFIG**
(org.apache.kafka.clients.producer.ProducerConfig#MAX_BLOCK_MS_CONFIG
)- 功能:设置生产者在发送请求被阻塞的最长时间(毫秒)。当缓冲区满时,若调用
send()
方法的请求超出该时间仍未释放缓冲区空间,生产者会抛出TimeoutException
。 - 默认值:60,000 ms(1分钟)
- 用途:用于避免发送超时的问题,确保资源在合理的时间内被释放。
- 功能:设置生产者在发送请求被阻塞的最长时间(毫秒)。当缓冲区满时,若调用
**BUFFER_MEMORY_CONFIG**
(org.apache.kafka.clients.producer.ProducerConfig#BUFFER_MEMORY_CONFIG
)- 功能:设置生产者可用的内存缓冲区大小(字节)。用于临时存放要发送的消息,等待被分批发送到服务器。
- 默认值:32 MB
- 用途:可以调节生产者的内存使用量,确保缓冲区不会因占用过多内存而影响系统性能。
**DELIVERY_TIMEOUT_MS_CONFIG**
(org.apache.kafka.clients.producer.ProducerConfig#DELIVERY_TIMEOUT_MS_CONFIG
)- 功能:设定消息从发送到接收到
ack
的最大允许延迟时间(毫秒),超过此时间的消息将被认为发送失败。 - 默认值:120,000 ms(2分钟)
- 用途:提供对消息最终交付延迟的控制,特别是对于可能因重试等导致的发送延迟提供一个超时限制。
- 功能:设定消息从发送到接收到
**RETRIES_CONFIG**
(org.apache.kafka.clients.CommonClientConfigs#RETRIES_CONFIG
)- 功能:设置生产者重试发送失败请求的次数。
- 默认值:2147483647(等效于无限制的重试)
- 用途:在网络或服务器不稳定的情况下增加消息的成功率,但要注意如果和
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
配置不当,可能导致消息乱序。
**RETRY_BACKOFF_MS_CONFIG**
(org.apache.kafka.clients.CommonClientConfigs#RETRY_BACKOFF_MS_CONFIG
)- 功能:设置重试间隔(毫秒),用于控制每次重试之间的等待时间。
- 默认值:100 ms
- 用途:适当的等待时间可以避免持续频繁地重试,避免加剧服务器负载或网络阻塞。
**MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION**
(org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
)- 功能:设置每个连接中未完成请求的最大数量。
- 默认值:5
- 用途:可以限制客户端同时发送请求的数量,有助于在重试时避免消息乱序的问题。
**ACKS_CONFIG**
(org.apache.kafka.clients.producer.ProducerConfig#ACKS_CONFIG
)- 功能:控制生产者发送数据后是否需要等待broker的确认,以及确认级别(如
acks=0
、acks=1
或acks=all
)。 - 默认值:1
- 用途:设置此参数可以控制可靠性,
acks=all
可确保所有副本成功写入数据,而acks=0
则提供最低的延迟。 **acks=0**
- 含义:生产者在发送消息后不等待任何确认(ack),就会认为消息已发送成功。
- 优点:具有最低的延迟,因为生产者不等待任何确认。
- 缺点:消息可能会丢失,因为生产者不关心消息是否到达服务器,适用于对数据可靠性要求极低的场景。
**acks=1**
- 含义:生产者在收到主副本的确认后认为消息发送成功,并继续发送下一条消息。
- 优点:在延迟和可靠性之间取得平衡;主副本成功写入后返回确认,减少了等待时间。
- 缺点:如果主副本成功写入但发生故障,其他副本尚未同步完成,消息可能会丢失。
**acks=all**
(或acks=-1
)- 含义:生产者等待所有同步副本都写入成功后,才认为消息发送成功。
- 优点:提供最高的可靠性,确保消息不会丢失(只要至少有一个同步副本可用)。
- 缺点:会增加延迟,因为需要等待所有同步副本都写入成功后才返回确认,适合对数据一致性要求高的场景。
- 功能:控制生产者发送数据后是否需要等待broker的确认,以及确认级别(如
**BATCH_SIZE_CONFIG**
(org.apache.kafka.clients.producer.ProducerConfig#BATCH_SIZE_CONFIG
)- 功能:设置每批次数据的大小(字节)。生产者将消息分成小批次发送,每批次最大大小为
BATCH_SIZE_CONFIG
。 - 默认值:16 KB
- 用途:通过将消息分批次处理,可以提高吞吐量,但如果批次过大会增加延迟,过小则会降低吞吐量。
- 功能:设置每批次数据的大小(字节)。生产者将消息分成小批次发送,每批次最大大小为
发送流程
Q&A
java.nio.ByteBuffer#clear方法会释放内存吗?
java.nio.ByteBuffer#clear()
方法不会释放内存。调用clear()
时,只是将缓冲区的位置指针重置为0,并将限制(limit)设置为缓冲区的容量(capacity),以便重新写入数据。
clear()
的具体作用
clear()
并不会修改缓冲区的内容或释放内存;- 它只是在逻辑上准备缓冲区重新被写入,通过重置位置和限制,标记缓冲区为空(尽管原来的数据依然存在,直到被新数据覆盖)。
释放 ByteBuffer
内存
要释放 ByteBuffer
占用的内存,可以考虑以下两种方式:
- 使用堆外内存的
**DirectByteBuffer**
- 如果是通过
ByteBuffer.allocateDirect()
创建的直接缓冲区,可以通过显式调用sun.misc.Cleaner
类的clean()
方法进行释放(如果JVM允许,使用时要特别注意跨平台兼容性)。即:sun.misc.Unsafe#invokeCleaner - 通常情况下,JVM的垃圾回收器会在合适的时机自动回收直接缓冲区的内存,尽管它不会立即生效。
- 如果是通过
- 在堆内存中通过GC回收
- 普通的
ByteBuffer.allocate()
分配的缓冲区内存位于堆内,会在垃圾回收时被回收。 - 当没有引用指向该
ByteBuffer
实例时,JVM 会自动回收其内存。
- 普通的
为什么BigBatch切分后的小批次没有清理缓冲区?
Kafka客户端在处理大批次切分时确实会通过 ByteBuffer.allocate
来为每个小批次分配新的内存,但在实际实现中,切分后的小批次 MemoryRecords
可能没有被立即回收。这与Kafka的批次内存管理机制和缓冲池策略有关。
切分批次后未立即回收的原因
切分后的批次在 Kafka 客户端中不会立即回收内存,主要原因是 Kafka 内部的批次和缓冲区管理在设计上有以下几个特点:
- 批次生命周期管理:
- Kafka客户端使用
MemoryRecords
表示消息批次,每个批次都有自己的生命周期,通常是在消息被成功发送或发送失败时释放。 - 切分后的小批次可能会被重新入队,以便在未来重试发送。Kafka为了保证切分批次在重试场景下仍能正常使用,不会在切分后立即释放每个
ByteBuffer
实例。
- Kafka客户端使用
- 缓冲区的再利用:
- Kafka客户端有自己的缓冲区池(
BufferPool
),但并不管理切分批次产生的小批次缓冲区的回收。每个小批次的ByteBuffer
通过allocate
方法分配,超出池管理范围,所以不会被自动复用。 - 这些小批次在发送完成或超时后,Kafka的代码中没有显式触发其回收。
- Kafka客户端有自己的缓冲区池(
- 垃圾回收的作用:
- 切分批次的
ByteBuffer
内存最终依赖于Java的垃圾回收来释放。在Java中,当没有引用指向这些切分的ByteBuffer
对象时,垃圾回收器会最终释放它们,但这不是即时生效的。
- 切分批次的
可能的优化
如果频繁发生批次切分导致较多的内存占用,可以考虑以下优化:
- 调整
**max.request.size**
:适当增大每次请求的最大字节数,避免批次被切分。 - 适配
**batch.size**
:合理设定批次大小,使每次生成的批次尽量贴近请求大小,减少切分。 - 延迟设置(
**linger.ms**
):延长批次缓冲时间,减少批次切分的频率。 - 监控与垃圾回收:确保JVM对这些未引用的
ByteBuffer
进行及时回收。
这种机制在批次切分的场景下可能会导致较高的内存驻留,但合理配置Kafka参数可以降低这种情况的频率。
kafka会对BufferPool中的ByteBuffer主动调用clear,既然并不会回收内存,那么清理的意义是什么?
Kafka中的BufferPool
对 ByteBuffer
调用 clear()
方法的意义不在于释放内存,而是为了复用缓冲区的空间,以便高效管理内存和优化性能。调用 clear()
可以在保留内存分配的前提下,重置缓冲区的状态,使其可以再次用于新的数据写入。
clear()
的具体意义
在 BufferPool
中,每次分配和归还的 ByteBuffer
都会被 clear()
重置。clear()
的主要作用是:
- 重置位置和限制指针:将缓冲区的
position
设置为0
,limit
设置为容量capacity
,准备好重新写入数据。这样,可以避免重复分配新的ByteBuffer
实例,减少内存分配和垃圾回收的开销。 - 复用内存:Kafka的
BufferPool
设计就是为了让多个生产者请求能够共享相同的内存池,从而提高内存利用率和性能。通过调用clear()
,缓冲区可以在不同的批次间复用,减少堆内存或直接内存的反复分配。
为什么不直接释放内存
内存分配和释放在Java中是相对开销较高的操作,尤其在需要频繁分配较大内存的场景下(如消息生产者批量发送数据)。通过 clear()
实现缓冲区复用,可以避免因频繁的内存分配导致的性能瓶颈和垃圾回收压力。
总结
BufferPool
通过 clear()
方法重置缓冲区,实现内存复用而非释放,从而提升了 Kafka 客户端的发送效率和内存利用率。这种方式在高并发、高吞吐的场景中尤其有优势,因为它能在性能和资源消耗之间取得良好的平衡。
原文地址:https://blog.csdn.net/u010597819/article/details/143996015
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!