I have a kafka consumer which after resume, it's picking up from last committed offset and sequentially consuming from there. Even with several calls to seekToEnd().
Logs
//before pause 2021-10-09 15:44:18,569 [fx-kafka-consumer-0-C-1] INFO c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024115] 2021-10-09 15:44:28,573 [fx-kafka-consumer-0-C-1] INFO c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024116]
//paused
//called seekToEnd before resume
//after resume //called seekToEnd several times before and after resume 2021-10-09 15:45:13,603 [kafka-consumer-0-C-1] INFO c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024117] 2021-10-09 15:45:53,610 [kafka-consumer-0-C-1] INFO c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024118] 2021-10-09 15:46:03,612 [kafka-consumer-0-C-1] INFO c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024119] 2021-10-09 15:46:13,613 [kafka-consumer-0-C-1] INFO c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024120]
Turns out this actually due to an issue with the implementation of the subclass extend AbstractConsumerSeekAware
.
Turned out there is an issue with my current code. At consumer start up, I did a manual seek in the onPartitionsAssigned
. This so far have been working well.
However, now when there is a need to pause/resume the consumer, turned out the overrode method to do the manual seek has since skipped maintaining the Consumer into the callbacks
field.
When the seekToEnd
was invoked, it's really iterating through an empty map.
No comments:
Post a Comment
Note: Only a member of this blog may post a comment.