Under some strange circumstances it can happen that a message in a Kafka topic is corrupted. This happens often by using 3rd party frameworks together with Kafka. Additionally, Kafka < 0.9 has no lock at Log.read() at the consumer read level, but has a lock on Log.write(). This can cause a rare race condition, as described in KAKFA-2477 [1]. Probably a log entry looks like:
bin/kafka-consumer-groups.sh --zookeeper management01:2181 --describe --group test
Prior to Kafka 0.9 the only possibility to get this informations was to use zkCli.sh (or similar tools) to find the consumer group. Since the debug with zkCli is a bit frustrating, I personally use kafka-manager from Yahoo [3].
ERROR Error processing message, stopping consumer: (kafka.tools.ConsoleConsumer$) kafka.message.InvalidMessageException: Message is corrupt (stored crc = xxxxxxxxxx, computed crc = yyyyyyyyyy
Kafka-Tools
Kafka stores the offset of every consumer in Zookeeper. To read out the offsets, Kafka provides handy tools [2]. But also zkCli.sh can be used, at least to display the consumer and the stored offsets. First we need to find the consumer for a topic (> Kafka 0.9):bin/kafka-consumer-groups.sh --zookeeper management01:2181 --describe --group test
Prior to Kafka 0.9 the only possibility to get this informations was to use zkCli.sh (or similar tools) to find the consumer group. Since the debug with zkCli is a bit frustrating, I personally use kafka-manager from Yahoo [3].
Let's assume the consumers are stored in Zookeeper under /consumer, the command to find the offset looks like:
ls /consumer/test/offsets
[1]
get /consumer/test/offsets/1
[15]
With Kafka that command would look like:
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group console-1 --zookeeper zknode1:2181
With Kafka that command would look like:
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group console-1 --zookeeper zknode1:2181
Group Topic Pid Offset logSize Lag Owner
console-1 test 1 15 337 326 none
After the offset was found, this offset can be incremented to force the consumer to read the next available message. Before doing this, Kafka has to be shutdown.
console-1 test 1 15 337 326 none
After the offset was found, this offset can be incremented to force the consumer to read the next available message. Before doing this, Kafka has to be shutdown.
bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest 16 test
After restart, Kafka should be able read the next message, in the case this message isn’t corrupted, too. And yes, the corrupted message is lost and can’t be restored, so it's always a good idea to implement a CRC check before any message gets to Kafka.
A code based approach is also available [4]. For that a subclass of the ConsumerIterator has to be created, which will catch the message exception, replace it with a dummy message and proceed with the next message. Of course the corrupted message is lost in that case, too.
A code based approach is also available [4]. For that a subclass of the ConsumerIterator has to be created, which will catch the message exception, replace it with a dummy message and proceed with the next message. Of course the corrupted message is lost in that case, too.
Comments
Post a Comment