Prevent losing Kafka messages due to infrastructure failure

KAFKAWRAP-7 - Getting issue details... STATUS

Investigate current behaviour with emulating of infrastructure slowness

Successful Import of 1_record file

This particular file was imported without emulating slowness and used to be compared with subsequent try of import of the same file with emulating slowness.

Logs fetched from mod-source-record-manager service.

AbstractChunkProcessingService:: processChunk was invoked once for each chunk 1 and 2. Only one Consumer - id: 6 was used to process event type DI_RAW_RECORDS_CHUNK_READ (instead of 3 consumers as in subsequent example), one Consumer - id: 1 was used to process DI_PARSED_RECORDS_CHUNK_SAVED and 2 consumers Consumer - id: 25, Consumer - id: 48 to process DI_COMPLETED event (all the same as in subsequent example with emulating slowness)

 Click here to expand the Logs...

05:44:47.479 [vert.x-worker-thread-17] DEBUG unkProcessingService [48308937eqId] AbstractChunkProcessingService:: processChunk for chunkNumber: 1,  correlationId: 1792c0b3-30f7-4e8a-90ac-d52eaa49cd63
05:44:47.480 [vert.x-worker-thread-17] DEBUG unkProcessingService [48308938eqId] AbstractChunkProcessingService:: processChunk for chunkNumber: 2,  correlationId: 5e4e59e7-919c-4f9c-8fb6-268d8ab8729f
05:44:47.710 [vert.x-worker-thread-15] INFO  ngeEngineServiceImpl [48309168eqId] Parsed 1 records out of 1
05:44:47.712 [vert.x-worker-thread-15] INFO  ngeEngineServiceImpl [48309170eqId] Total marc holdings records: 1, invalid marc bib ids: 0, valid marc bib records: 1
05:44:47.740 [vert.x-worker-thread-14] DEBUG KafkaConsumerWrapper [48309198eqId] Consumer - id: 6 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committing offset: 22
05:44:48.634 [vert.x-worker-thread-12] INFO  KafkaConsumerWrapper [48310092eqId] Consumer - id: 6 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committed offset: 22
05:44:49.352 [vert.x-worker-thread-14] DEBUG KafkaConsumerWrapper [48310810eqId] Consumer - id: 6 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committing offset: 21
05:44:49.792 [vert.x-worker-thread-15] INFO  KafkaConsumerWrapper [48311250eqId] Consumer - id: 6 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committed offset: 21
05:44:49.901 [vert.x-worker-thread-7] DEBUG KafkaConsumerWrapper [48311359eqId] Consumer - id: 1 subscriptionPattern: SubscriptionDefinition(eventType=DI_PARSED_RECORDS_CHUNK_SAVED, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_PARSED_RECORDS_CHUNK_SAVED) Committing offset: 8
05:44:50.840 [vert.x-worker-thread-18] INFO  KafkaConsumerWrapper [48312298eqId] Consumer - id: 1 subscriptionPattern: SubscriptionDefinition(eventType=DI_PARSED_RECORDS_CHUNK_SAVED, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_PARSED_RECORDS_CHUNK_SAVED) Committed offset: 8
05:44:52.228 [vert.x-worker-thread-14] DEBUG KafkaConsumerWrapper [48313686eqId] Consumer - id: 25 subscriptionPattern: SubscriptionDefinition(eventType=DI_COMPLETED, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_COMPLETED) Committing offset: 8

05:44:52.263 [vert.x-worker-thread-13] INFO  KafkaConsumerWrapper [48313721eqId] Consumer - id: 25 subscriptionPattern: SubscriptionDefinition(eventType=DI_COMPLETED, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_COMPLETED) Committed offset: 8

05:44:52.463 [vert.x-worker-thread-14] DEBUG KafkaConsumerWrapper [48313921eqId] Consumer - id: 48 subscriptionPattern: SubscriptionDefinition(eventType=DI_COMPLETED, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_COMPLETED) Committing offset: 8

05:44:53.150 [vert.x-worker-thread-6] INFO  KafkaConsumerWrapper [48314608eqId] Consumer - id: 48 subscriptionPattern: SubscriptionDefinition(eventType=DI_COMPLETED, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_COMPLETED) Committed offset: 8


Successful Import of 1_record file with debug mode emulating slowness

As you can see here during processing of DI_RAW_RECORDS_CHUNK_READ event type it were different exceptions marked as red in log connected with timeouts, rebalancing Kafka consumers, commit failures caused by waiting in debug mode.

From these logs we see that service handler for processing incoming chunks invoked multiple times AbstractChunkProcessingService:: processChunk (instead of just 1 time for each chunk number as in first example without debug mode).

I marked correlation ids and consumers ids into one color and after this we see that it was 3 different correlation ids and 3 consumers used( Consumer - id: 0, Consumer - id: 3, Consumer - id: 6) instead just one consumer Consumer - id: 6 in previous example for importing 1_record without debug mode.

These 3 consumers created I assume due to rebalance process caused by debug mode and they tried to commit messages with offsets 18,19,20. As result Consumer - id: 0 committed all three offsets after some tries, Consumer - id: 3 not committed any offsets, Consumer - id: 6 committed offset 19.

For processing event DI_PARSED_RECORDS_CHUNK_SAVED and DI_COMPLETED I have not put any debug breakpoints and these events processed as in previous example without any consumer rebalancing processes in the same way as in previous example, DI_PARSED_RECORDS_CHUNK_SAVED was processed by Consumer 1 and DI_COMPLETED was processed by Consumers 25 and 48 and finally import completed successfully.

All these slowness and timeouts emulated by debug mode were successfully handled by Kafka implementation(in some previous tries I also saw Postgres database query timeouts caused by debug mode, but these queries were picked up by another consumers and import was successful)


 Click here to expand the Logs...


16:23:38.379 [vert.x-worker-thread-4] DEBUG unkProcessingService [239837eqId] AbstractChunkProcessingService:: processChunk for chunkNumber: 1, correlationId: be43e931-9987-4164-bce7-952b261c09b4
16:24:33.293 [vert.x-worker-thread-7] DEBUG unkProcessingService [294751eqId] AbstractChunkProcessingService:: processChunk for chunkNumber: 2, correlationId: 5f755d44-9fd5-44fe-a116-5e4974afd95f
16:24:40.042 [vert.x-worker-thread-16] DEBUG unkProcessingService [301500eqId] AbstractChunkProcessingService:: processChunk for chunkNumber: 2, correlationId: 44ad5a1a-9d60-426a-abd8-5608c5e7c27d
16:24:41.150 [vert.x-worker-thread-13] DEBUG unkProcessingService [302608eqId] AbstractChunkProcessingService:: processChunk for chunkNumber: 2, correlationId: 44ad5a1a-9d60-426a-abd8-5608c5e7c27d
16:24:48.439 [vert.x-worker-thread-13] DEBUG unkProcessingService [309897eqId] AbstractChunkProcessingService:: processChunk for chunkNumber: 1,  correlationId: be43e931-9987-4164-bce7-952b261c09b4
16:24:49.343 [vert.x-worker-thread-16] DEBUG unkProcessingService [310801eqId] AbstractChunkProcessingService:: processChunk for chunkNumber: 1, correlationId: be43e931-9987-4164-bce7-952b261c09b4
16:24:51.543 [vert.x-worker-thread-13] DEBUG unkProcessingService [313001eqId] AbstractChunkProcessingService:: processChunk for chunkNumber: 2, correlationId: 5f755d44-9fd5-44fe-a116-5e4974afd95f
16:25:07.860 [vert.x-worker-thread-16] DEBUG unkProcessingService [329318eqId] AbstractChunkProcessingService:: processChunk for chunkNumber: 2, correlationId: 5f755d44-9fd5-44fe-a116-5e4974afd95f
16:25:11.332 [vert.x-worker-thread-6] DEBUG unkProcessingService [332790eqId] AbstractChunkProcessingService:: processChunk for chunkNumber: 2,  correlationId: 44ad5a1a-9d60-426a-abd8-5608c5e7c27d
16:25:14.851 [vert.x-worker-thread-6] DEBUG unkProcessingService [336309eqId] AbstractChunkProcessingService:: processChunk for chunkNumber: 1,  correlationId: be43e931-9987-4164-bce7-952b261c09b4
16:25:19.799 [vert.x-worker-thread-7] DEBUG KafkaConsumerWrapper [341257eqId] Consumer - id: 0 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committing offset: 19
16:26:21.878 [vert.x-worker-thread-14] DEBUG KafkaConsumerWrapper [403336eqId] Consumer - id: 3 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committing offset: 19
16:26:24.719 [vert.x-kafka-consumer-thread-7] ERROR KafkaConsumerWrapper [406177eqId] Error while KafkaConsumerWrapper is working:
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
16:26:33.331 [vert.x-worker-thread-7] ERROR KafkaConsumerWrapper [414789eqId] Consumer - id: 0 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Error while commit offset: 19
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
16:26:22.808 [vert.x-kafka-consumer-thread-2] ERROR KafkaConsumerWrapper [404266eqId] Error while KafkaConsumerWrapper is working:
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
16:26:26.232 [vert.x-worker-thread-6] ERROR rcChunksKafkaHandler [407690eqId] RawRecordsDto processing has failed with errors chunkNumber: 1, chunkId: d5463cf7-33c0-4546-9cb2-f1d45e63b5c8, correlationId: be43e931-9987-4164-bce7-952b261c09b4
io.vertx.core.impl.NoStackTraceThrowable: Timeout
16:26:33.285 [vert.x-worker-thread-14] ERROR KafkaConsumerWrapper [414743eqId] Consumer - id: 3 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Error while commit offset: 19
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
16:26:33.438 [vert.x-worker-thread-6] DEBUG KafkaConsumerWrapper [414896eqId] Consumer - id: 6 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committing offset: 19
16:26:33.442 [vert.x-kafka-consumer-thread-6] ERROR KafkaConsumerWrapper [414900eqId] Error while KafkaConsumerWrapper is working:
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1109) ~[mod-source-record-manager-server-fat.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:976) ~[mod-source-record-manager-server-fat.jar:?]
        ...
16:26:33.466 [vert.x-worker-thread-6] ERROR KafkaConsumerWrapper [414924eqId] Error while processing a record - id: 6 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ)
io.vertx.core.impl.NoStackTraceThrowable: Timeout
16:26:33.583 [vert.x-worker-thread-6] DEBUG KafkaConsumerWrapper [415041eqId] Consumer - id: 0 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committing offset: 20
16:26:33.605 [vert.x-worker-thread-3] DEBUG unkProcessingService [415063eqId] AbstractChunkProcessingService:: processChunk for chunkNumber: 2, correlationId: 44ad5a1a-9d60-426a-abd8-5608c5e7c27d
16:26:33.592 [vert.x-worker-thread-6] ERROR KafkaConsumerWrapper [415050eqId] Consumer - id: 6 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Error while commit offset: 19
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1109) ~[mod-source-record-manager-server-fat.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:976) ~[mod-source-record-manager-server-fat.jar:?]
        ...
16:26:33.704 [vert.x-worker-thread-3] DEBUG unkProcessingService [415162eqId] AbstractChunkProcessingService:: processChunk for chunkNumber: 1, correlationId: be43e931-9987-4164-bce7-952b261c09b4
16:26:33.715 [vert.x-worker-thread-3] DEBUG unkProcessingService [415173eqId] AbstractChunkProcessingService:: processChunk for chunkNumber: 2, correlationId: 5f755d44-9fd5-44fe-a116-5e4974afd95f
16:26:33.761 [vert.x-worker-thread-3] INFO  KafkaConsumerWrapper [415219eqId] Consumer - id: 0 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committed offset: 18
16:26:33.763 [vert.x-worker-thread-3] INFO  KafkaConsumerWrapper [415221eqId] Consumer - id: 0 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committed offset: 20
16:26:33.996 [vert.x-worker-thread-10] DEBUG KafkaConsumerWrapper [415454eqId] Consumer - id: 0 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committing offset: 19
16:26:34.760 [vert.x-worker-thread-11] DEBUG KafkaConsumerWrapper [416218eqId] Consumer - id: 0 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committing offset: 18
16:26:34.827 [vert.x-worker-thread-19] DEBUG KafkaConsumerWrapper [416285eqId] Consumer - id: 3 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committing offset: 18
16:26:34.866 [vert.x-worker-thread-11] INFO  KafkaConsumerWrapper [416324eqId] Consumer - id: 0 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committed offset: 19
16:26:34.903 [vert.x-worker-thread-11] INFO  KafkaConsumerWrapper [416361eqId] Consumer - id: 0 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committed offset: 18
16:26:34.960 [vert.x-worker-thread-10] DEBUG KafkaConsumerWrapper [416418eqId] Consumer - id: 0 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committing offset: 20
16:26:34.976 [vert.x-worker-thread-0] DEBUG KafkaConsumerWrapper [416434eqId] Consumer - id: 6 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committing offset: 20
16:26:35.022 [vert.x-worker-thread-17] DEBUG KafkaConsumerWrapper [416480eqId] Consumer - id: 3 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committing offset: 20
16:26:35.247 [vert.x-worker-thread-4] INFO  ngeEngineServiceImpl [416705eqId] Parsed 1 records out of 1
16:26:35.251 [vert.x-worker-thread-4] INFO  ngeEngineServiceImpl [416709eqId] MARC_BIB invalid list ids: []
16:26:35.252 [vert.x-worker-thread-4] INFO  ngeEngineServiceImpl [416710eqId] Total marc holdings records: 1, invalid marc bib ids: 0, valid marc bib records: 1
16:26:35.276 [vert.x-worker-thread-3] DEBUG KafkaConsumerWrapper [416734eqId] Consumer - id: 6 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committing offset: 18
16:26:35.335 [vert.x-worker-thread-2] DEBUG KafkaConsumerWrapper [416793eqId] Consumer - id: 6 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committing offset: 20
16:26:35.474 [vert.x-worker-thread-9] ERROR KafkaConsumerWrapper [416932eqId] Consumer - id: 3 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Error while commit offset: 18
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
16:26:35.476 [vert.x-kafka-consumer-thread-7] ERROR KafkaConsumerWrapper [416934eqId] Error while KafkaConsumerWrapper is working:
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
16:26:35.504 [vert.x-worker-thread-2] ERROR KafkaConsumerWrapper [416962eqId] Consumer - id: 3 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Error while commit offset: 20
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
16:26:35.505 [vert.x-kafka-consumer-thread-7] ERROR KafkaConsumerWrapper [416963eqId] Error while KafkaConsumerWrapper is working:
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
16:26:35.817 [vert.x-worker-thread-15] ERROR KafkaConsumerWrapper [417275eqId] Consumer - id: 6 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Error while commit offset: 20
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
16:26:35.804 [vert.x-kafka-consumer-thread-6] ERROR KafkaConsumerWrapper [417262eqId] Error while KafkaConsumerWrapper is working:
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
16:26:35.850 [vert.x-worker-thread-6] ERROR KafkaConsumerWrapper [417308eqId] Consumer - id: 6 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Error while commit offset: 18
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
16:26:35.881 [vert.x-worker-thread-0] INFO  KafkaConsumerWrapper [417339eqId] Consumer - id: 0 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committed offset: 20
16:26:35.862 [vert.x-kafka-consumer-thread-6] ERROR KafkaConsumerWrapper [417320eqId] Error while KafkaConsumerWrapper is working:
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
16:26:35.946 [vert.x-worker-thread-17] ERROR KafkaConsumerWrapper [417404eqId] Consumer - id: 6 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Error while commit offset: 20
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
16:26:35.947 [vert.x-kafka-consumer-thread-6] ERROR KafkaConsumerWrapper [417405eqId] Error while KafkaConsumerWrapper is working:
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
16:26:37.395 [vert.x-worker-thread-15] DEBUG EventHandlingUtil    [418853eqId] Starting to send event to Kafka for eventType: DI_RAW_RECORDS_CHUNK_PARSED
16:26:37.445 [vert.x-worker-thread-0] INFO  EventHandlingUtil    [418903eqId] Event with type: DI_RAW_RECORDS_CHUNK_PARSED and correlationId: be43e931-9987-4164-bce7-952b261c09b4 was sent to kafka
16:26:37.591 [vert.x-worker-thread-16] DEBUG KafkaConsumerWrapper [419049eqId] Consumer - id: 6 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committing offset: 19
16:26:38.125 [vert.x-worker-thread-2] DEBUG EventHandlingUtil    [419583eqId] Starting to send event to Kafka for eventType: DI_SRS_MARC_BIB_RECORD_CREATED
16:26:38.212 [vert.x-worker-thread-15] INFO  EventHandlingUtil    [419670eqId] Event with type: DI_SRS_MARC_BIB_RECORD_CREATED and correlationId: 6c3d8fae-6a63-41ab-b188-0ee5ff832681 was sent to kafka
16:26:38.241 [vert.x-worker-thread-15] DEBUG KafkaConsumerWrapper [419699eqId] Consumer - id: 1 subscriptionPattern: SubscriptionDefinition(eventType=DI_PARSED_RECORDS_CHUNK_SAVED, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_PARSED_RECORDS_CHUNK_SAVED) Committing offset: 7
16:26:38.242 [vert.x-worker-thread-15] DEBUG KafkaConsumerWrapper [419700eqId] Threshold is exceeded, preparing to resume, globalLoad: 0, currentLoad: 0, requestNo: -2
16:26:38.274 [vert.x-worker-thread-11] INFO  KafkaConsumerWrapper [419732eqId] Consumer - id: 6 subscriptionPattern: SubscriptionDefinition(eventType=DI_RAW_RECORDS_CHUNK_READ, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_RAW_RECORDS_CHUNK_READ) Committed offset: 19
16:26:39.118 [vert.x-worker-thread-18] INFO  KafkaConsumerWrapper [420576eqId] Consumer - id: 1 subscriptionPattern: SubscriptionDefinition(eventType=DI_PARSED_RECORDS_CHUNK_SAVED, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_PARSED_RECORDS_CHUNK_SAVED) Committed offset: 7
16:26:42.269 [vert.x-worker-thread-17] DEBUG KafkaConsumerWrapper [423727eqId] Consumer - id: 25 subscriptionPattern: SubscriptionDefinition(eventType=DI_COMPLETED, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_COMPLETED) Committing offset: 7
16:26:42.284 [vert.x-worker-thread-15] INFO  KafkaConsumerWrapper [423742eqId] Consumer - id: 25 subscriptionPattern: SubscriptionDefinition(eventType=DI_COMPLETED, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_COMPLETED) Committed offset: 7
16:26:42.828 [vert.x-worker-thread-5] DEBUG KafkaConsumerWrapper [424286eqId] Consumer - id: 48 subscriptionPattern: SubscriptionDefinition(eventType=DI_COMPLETED, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_COMPLETED) Committing offset: 7
16:26:43.310 [vert.x-worker-thread-16] INFO  KafkaConsumerWrapper [424768eqId] Consumer - id: 48 subscriptionPattern: SubscriptionDefinition(eventType=DI_COMPLETED, subscriptionPattern=FOLIO\.Default\.\w{1,}\.DI_COMPLETED) Committed offset: 7

Conclusions

For now I don't see a reason to add some additional retry mechanisms, it works by default by Kafka implementation. If Kafka consumer not heartbeated by Kafka broker, it threw out from the broker when trying to commit offset. After this new consumer picks up this work and commits offset.

To properly handle these Kafka retry mechanisms need to make sure that our consumer's handlers are idempotent, can be executed multiple times, and our services can handle duplicates.

If we would see errors in logs like RebalanceInProgressException, NoStackTraceThrowable: Timeout, CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group - we need to investigate these particular cases separately why records processed long time, or why consumers not hearbitted by Kafka broker, but Kafka retry mechanism works good.

Incorrect handling business exceptions

What going wrong

Time to time import could be stack due to some exception and user sees that progress bar remains in the same position and nothing happens.

For testing I modified handler to process DI_PARSED_RECORDS_CHUNK_SAVED in mod-source-record-manager to throw new RuntimeException("jobExecutionId is invalid.");

For this case we logged exception, but also commited message as successful processed(Pic. 1) and progress bar was stack on the same place forever.

Proposed change in folio-kafka-wrapper

Create contract for error handler that should be invoked by Kafka consumer wrapper library if processing was not successful. This handler's contract should be implemented by corresponding services that using folio-kafka-wrapper.

Each service should implement error's handler to send to Kafka DI_ERROR event if processing was with exception. In handler's implementation need to make sure, that DI_ERROR thrown for particular record, not for all file.

User stories:

KAFKAWRAP-3 - Getting issue details... STATUS

MODINV-408 - Getting issue details... STATUS

MODINVOICE-252 - Getting issue details... STATUS

MODSOURCE-290 - Getting issue details... STATUS

MODSOURMAN-474 - Getting issue details... STATUS