kafka for c++

  • reset offset

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    class ExampleRebalanceCb : public RdKafka::RebalanceCb {
    public:
    void rebalance_cb (RdKafka::KafkaConsumer *consumer,
    RdKafka::ErrorCode err,
    std::vector<RdKafka::TopicPartition*> &partitions) {
    if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
    RdKafka::TopicPartition *part;
    // find the partition, through std::find() or other means
    ...
    if (part)
    part->set_offset(1234);
    consumer->assign(partitions);
    } else {
    consumer->unassign();
    }
    }
    };