基础配置:
bootstrap.servers.group.id / key.deserializer / value.deserializer。
Kafka的文档列出了所有与消费者相关的配置说明。大部分参数都有合理的默认值,一般不需要修改他们,不过有一些参数与消费者的性能和可用性有很大关系。
fetch.min.bytes
消费者从服务器获取记录的最小字节数。broker在收到消费者的数据请求时,如果可用的数据量小于fetch.min.bytes指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和broker的工作负载(主题不是很活跃的时候就不需要来来回回地处理消息)。
经验:1.如果没有很多可用数据,但消费者的cpu使用率很高,那么就需要把该属性的值调大。
2.如果消费者数量比较多,把该属性设置大一点可以降低broker的工作负载。
fetch.max.wait.ms
用于指定 broker的等待时间,默认是 500ms。如果没有足够的数据流入 Kafka,消费者获取最小数据量的要求就得不到满足,最终导致500ms的延迟。 如果要降低 潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。如果 fetch.max.wait.ms 被设为 1OOms,并且 fetch.min.bytes被设为 1MB,那么 Kafka在收到消费者的请求后,要么返 回 IMB 数据,要么在 1OOms 后返回所有可用的数据 ,就看哪个条件先得到满足。
max.partition.fetch.bytes
指定了服务器从每个分区里返回给消费者的最大字节数。默认值为1MB,也就是说,kafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过max.partition.fetch.bytes指定的字节。
eg:如果一个topic有20个partition和5个consumer,那么每个consumer需要至少4MB的可用内存来接收记录。
经验:1.在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。
2.max.partition.fetch.bytes的值必须大于broker能够接受的最大消息的字节数(max.message.size配置),否则消费者可能无法读取这些消息,导致消费者一直挂起重试。
3.消费者需要频繁调用poll() 方法来避免会话过期和发生分区再均衡,如果单次调用poll() 返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况可以把该属性值调小,或者延长会话过期时间。
session.timeout.ms
指定了消费者可以多久不发送心跳,默认为3s。如果consumer没有在session.timeout.ms指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡。该属性与heartbeat.interval.ms紧密相关。heartbeat.interval.ms指定了poll() 方法向协调器发送心跳的频率。故,一般需要同时修改这两个属性,heartbeat.interval.ms必须比session.timeout.ms小,一般是session.timeout.ms的三分之一。
经验:1.把session.timeout.ms值设得比默认值小,可以更快的检测和恢复崩溃的节点,不多长时间的轮询或垃圾收集可能导致非预期的再均衡。
2.把session.timeout.ms值设得比默认值大,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。
auto.offset.reset
指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含的偏移量记录已经过时并被删除)该如何处理。默认值为latest,意思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是earliest,意思是说,在偏移量无效的情况下,消费者将从起始位置读取分区的记录。
enable.autxo.commit
指定了考费者是否自动提交偏移量,默认值为true。
经验:1.为了尽量避免出现重复数据和数据丢失,可以把它设为 false,由 自 己控制何时提交偏移量。
2.如果把它设为 t「ue,还可以通过配置 auto.commit.interval.ms 属性来控制提交的频率。
partition.assignment.strategy
PartitionAssignor根据给定的consumer和topic,决定哪些分区应该被分配给哪个消费者。Kafka有两个默认的分配策略。
1 | 1):Range |
可以通过设置 parti.ti.on.assi.gnl’lent.st「ategy 来选择分区策略。默认使用的是 0「g. apache.kafka.cli.ents.consul’le「.RangeAssi.gno「, 这个类实现了 Range策略,不过也可以 把它改成 。「g.apache.kafka.cli.ents.consul’le「.RoundRobi.nAssi.gnor。我们还可以使用自定 义策略,在这种情况下 , pa「t”i.ti.on.assi.gnl’lent.strategy 属性的值就是自定义类的名字。
client.id
该属性可以是任意字符串 , broker用它来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额里。
max.poll.records
该属性用于控制单次调用 call() 方住能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。
receive.buffer.bytes和send.buffer.bytes
socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为-1,就使用操作系统的默认值。如果生产者或消费者与broker处于不同的数据中心内,可以适当增大这些值因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
Be the first person to leave a comment!