场景
等待条件发生
比如,有这样一个场景,在 spring boot 工程里面,有一个 controller,他会接受数据并把数据写入 kafka,然后返回写入 kafka 的结果。在调用 send 方法后,会得到一个 ListenableFuture
对象,这个对象可以传入 callback 对象,这是一个异步的过程,需要等待回调执行后,才能将结果返回给客户端。
我们就需要一种机制等待回调事件,这里用的模式如下。
1 | Object mutex = new Object(); |
1 |
|
比如,有这样一个场景,在 spring boot 工程里面,有一个 controller,他会接受数据并把数据写入 kafka,然后返回写入 kafka 的结果。在调用 send 方法后,会得到一个 ListenableFuture
对象,这个对象可以传入 callback 对象,这是一个异步的过程,需要等待回调执行后,才能将结果返回给客户端。
我们就需要一种机制等待回调事件,这里用的模式如下。
1 | Object mutex = new Object(); |
1 |
|
[toc]
1 | 列出所有 consumer group |
1 | ./bin/kafka-topics.sh --list --zookeeper server.zk |
1 | kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000 |
1 | ./bin/kafka-topics.sh --zookeeper c3cloudsrv.zk.hadoop.srv:11000/kafka/c3cloudsrv-feeds --describe --topic model_diff_update_111 |
1 | kafka-topics --zookeeper 127.0.0.1:2181 --describe --topic <topic-name> |
1 | ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic <topic> --from-beginning |
kafka-auth.properties
1 | security.protocol = "ssl" |
1 | 指定配置文件 |
参考
1 | CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL |
1 | /bin/kafka-console-producer.sh --broker-list localhost:9092 --topic <topic> |
1 | ./bin/kafka-topics --delete --zookeeper 127.0.0.1:2181 --topic <topic-name> |
架构
名词
.index
文件.log
文件如何提高可靠性
request.required.acks
等效,如果leader宕机,也会发生数据丢失应用
Ref
1 | val props = new Properties() |
偏移量是,在一个分区内下一条需要发送给消费者的消息位置。Kafka包括两种类型的offset:
总结
Commited Offset在分区平衡的情况下至关重要。分区平衡的情况下,新的消费者被分配到新的分区时,Committed Offset可以解决从哪里开始、哪些记录已经被消费的问题。
Current offset 和 committed offset由Kafka管理。提交一个offset的方式有两种:
通过两个属性来控制Auto-Commit:
auto-commit的一个问题是,在提交之前,数据可能会被其他消费者处理。这种情况下,没办法彻底避免消息被重复处理。
可以使用manual-commit的方式解决auto-commit的问题。manual-commit由两种方式:
一个简单示例。
1 | import java.util.*; |
这里:
Offset记录位置是根据Kafka broker版本和Kafka client版本决定。
Kafka version\Kafka deriver version | <0.9 |
>=0.9 |
---|---|---|
<0.9 |
Offset Storage: Zookeeper | Offset Storage: Zookeeper |
>=0.9 |
Offset Storage: Zookeeper | Offset Storage: Kafka |
如果Broker存储Offset,处理方式:
__consumer_offsets
中在kafka 0.11.x 版本配合 cppkafka client,可以断定,offset 后端存储是kafka broker进行管理,依据:
__consumer_offsets
主题,且有offsets信息1 | ./kafka-console-consumer.sh --consumer.config /tmp/consumer.config --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --topic __consumer_offsets --zookeeper *:2181/feeds/infra/feeds-kafka-srv --from-beginning |
/consumers/{CONSUMER_GROUP_ID}/offsets/{TOPIC_NAME}/{PARTITION_NUMBER}
Ref
num.partitioins
指定支持前缀ACLs
添加认证框架(authenticate framework)
支持主机名认证
动态更新SSL信任库
改进复制协议避免leader和follower间日志差异
减少消息转换占用内存,提升broker弹性
减少消息分块内存使用,避免broker的OutOfMemory错误
在应用限流之前通知客户端,使得客户端在超过限额时,可以区分是网络错误还是限流
增加消费者配置选项,避免无限阻塞
放弃Java7
Kafka Connect改进
增加Scala wrapper API
支持消息头信息添加读取
Kafka Streams中的窗口聚合性能有大的提升
Kafka Controller有大的改善
增加对日志目录副本移动支持,以便于JBOD实现数据平衡
动态更新某些broker配置
增加代理令牌身份验证,以支持大量客户端导致的其他身份验证服务器过载
Kafka Connect功能更新
Kafka Streams API改进
内置了机架感知以便隔离副本,使得Kafka保证副本可以跨越到多个机架或者是可用区域,显著提高了Kafka的弹性和可用性
所有Kafka中的消息都包含了时间戳字段,这个时间就是这条消息产生的时间。这使得Kafka Streams能够处理基于事件时间的流处理;而且那些通过时间寻找消息以及那些基于事件时间戳的垃圾回收特性能为可能。
Apache Kafka 0.9.0.0版本引入了新的安全特性,包括通过SASL支持Kerberos。Apache Kafka 0.10.0.0现在支持更多的SASL特性,包括外部授权服务器,在一台服务器上支持多种类型的SASL认证以及其他的改进。
Kafka Connect得到了持续提升。在此之前,用户需要监控日志以便看到各个connectors以及他们task的状态,现在Kafka已经支持了获取的状态API这样使得监控变得更简单。同时也添加了控制相关的API,这使得用户可以在进行维护的时候停止一个connector;或者手动地重启那些失败的task。这些能够直观的在用户界面展示和管理connector目前可以在控制中心(Control Center)看到。
Kafka Consumer Max Records,在Kafka 0.9.0.0,开发者们在新consumer上使用poll()函数的时候是几乎无法控制返回消息的条数。不过值得高兴的是,此版本的Kafka引入了max.poll.records参数,允许开发者控制返回消息的条数。
协议版本改进,Kafka brokers现在支持返回所有支持的协议版本的请求API,这个特点的好处就是以后将允许一个客户端支持多个broker版本。
broker 是一个 kafka 进程,多个 broker 组成了一个 kafka 集群。在 conumer 和 producer 中,使用如下方式指定多个 broker。
1 | Properties props = new Properties(); |
主题,标识了一个消息队列。producer 产生数据时需要指定,consumer 消费时也需要指定,两者指定的 topic 匹配来达到数据的流通。
生产者,产生消息的实例。producer 需要指定 topic,可选地将消息写入哪个 partition,通过指定 partitioner.class
选项实现。
1 | Properties props = new Properties(); |
消费者,逻辑概念,指从消息队列获取消息的实例。consumer 必须指定 topic,可选地指定 partition。
1 | Properties props = new Properties(); |
consumer instance 是一个消费实例,具体地是某个程序的某个进程/线程。
consumer group 是一个逻辑概念,由一个或多个 consumer instance 组成,这种组成是通过 consumer instance 消费时指定 cosumer group 来隐式加入的。
offset 标记了一个 consumer group 消费消息的偏移量。偏移量信息保存在 zookeeper 中,保存的信息有过期时间,默认的过期时间在 kafka 的配置文件中,配置项为 offsets.retention.minutes
,下面是一个示例。
1 | # file: kafka/config/server.properties |
默认的过期时间是 1440 分钟,即 24 小时。
分区(partition)是物理概念,topic 是逻辑概念,一个 topic 有至少一个 partition,每个 partition 会对应某个 broker 磁盘上的一些区域(更具体地,对应一个文件夹)。
可以将 topic 划分为多个分区,根据分区规则把消息存储到某个分区,如果分区规则能将消息均匀地分散到各 partition,这个过程就可以看做负载均衡和水平扩展。
同时,consumer 可以从一个或者多个 partition 中消费消息。
segment 是 partition细分的物理概念,包括.index
文件(索引文件)和.log
文件(数据文件)。
可以通过为 topic 设置多个 replication,来保证数据可靠性,多个 replication 是主从关系,主挂后,从从中选举新的主。
拓扑中有两种特殊的处理器。
在普通的处理器内,可以访问其他远程系统。
reset offset
1 | class ExampleRebalanceCb : public RdKafka::RebalanceCb { |
1 | // 连接 kafka |
1 |
|
1 |
|
1 |
|