-
reset offset
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17class ExampleRebalanceCb : public RdKafka::RebalanceCb {
public:
void rebalance_cb (RdKafka::KafkaConsumer *consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
RdKafka::TopicPartition *part;
// find the partition, through std::find() or other means
...
if (part)
part->set_offset(1234);
consumer->assign(partitions);
} else {
consumer->unassign();
}
}
};