Skip to end of metadata
Go to start of metadata

Since R1 2021 Iris release Data Import application is using KAFKA as a transport

As of the Iris release, Data import no longer goes through PubSub. All modules involved in data import (mod-data-import, mod-source-record-manager, mod-source-record-storage, mod-inventory, mod-invoice) are communicating via Kafka directly. Therefore, to enable data import Kafka should be set up properly and all the necessary parameters should be set for the modules.

Below are the example Kafka configuration settings that were used for testing import of 50k records on Performance Task Force and Bugfest envs that are using MSK, running two containers for each module (except mod-data-import that runs in one container) on m5.large  ec2 instances 

auto.create.topics.enable=true (necessary to enable automatic topic creation)
default.replication.factor=2 (make sure this setting is not higher than number of brokers, e.g. for 2 brokers, replication factor should be set to 2)
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
log.retention.minutes=300 (on PTF and Bugfest envs 70 minutes was enough to import 50k records with
Default job profile for importing MARC bibs and creating Instances)

Adjusted module configs
inventory.kafka.DataImportConsumerVerticle.instancesNumber=10
inventory.kafka.MarcBibInstanceHridSetConsumerVerticle.instancesNumber=10

Kafka disc space was set to 500 Gb (disk space was not highly utilised, it depends on log retention and load).

Modules memory configs in Mb:

Module CPUHard/Soft memory
mod-data-import 128

2048/1024

mod-source-record-manager 1281440/896
mod-source-record-storage 1281440/896
mod-inventory 2561872/1440
mod-inventory-storage128864/536

Make sure that auto.create.topics.enable=true setting is set for MSK. This will allow topics to be created automatically. Otherwise topics used for DI purposes will need to be created manually (see below the list of data import topics). Number of partitions and replication factor should be configured on Kafka side. Make sure that replication factor is not higher than the number of brokers

There are several properties that should be set for modules that interact with Kafka: KAFKA_HOST, KAFKA_PORT, ENV(unique env ID).

List of modules, for which mentioned parameters should be set:

  • mod-data-import
  • mod-source-record-manager
  • mod-source-record-storage
  • mod-inventory
  • mod-invoice

KAFKA_HOST and KAFKA_PORT values should also be specified for mod-inventory-storage. mod-inventory-storage also requires REPLICATION_FACTOR value to be set https://github.com/folio-org/mod-inventory-storage/blob/master/README.MD#kafka.

After setup, it is good to check logs in all related modules for errors. Data import consumers and producers work in separate verticles that are set up in RMB's InitAPI for each module. That would be the first place to check deploy/install logs.

There are other properties that should be set for data import modules:

mod-data-import

Properties related to file upload that should be set at mod-configuration are described in the doc  https://github.com/folio-org/mod-data-import#module-properties-to-set-up-at-mod-configuration

System property that can be adjustedDefault value
file.processing.buffer.chunk.size50

For releases prior to Kiwi it is recommended to set the file.processing.buffer.chunk.size property to in order to prevent mod-source-record-storage from crashing with OOM during an Update import of 5,000 records. The property can be set to to allow Update import of 10,000 records.

mod-source-record-manager 

System property that can be adjustedDefault value
kafkacache.topic.number.partitions1
kafkacache.topic.replication.factor1
kafkacache.log.retention.ms18000000   
kafkacache.topicevents_cache
srm.kafkacache.cleanup.interval.ms3600000
srm.kafkacache.expiration.time.hours

3   

srm.kafka.RawMarcChunkConsumer.instancesNumber5
srm.kafka.StoredMarcChunkConsumer.instancesNumber5
srm.kafka.DataImportConsumersVerticle.instancesNumber5
srm.kafka.DataImportJournalConsumersVerticle.instancesNumber5
srm.kafka.RawChunksKafkaHandler.maxDistributionNum100
srm.kafka.CreatedRecordsKafkaHandler.maxDistributionNum100
srm.kafka.DataImportConsumer.loadLimit5
security.protocolPLAINTEXT
ssl.protocolTLSv1.2
ssl.key.password-
ssl.keystore.location-
ssl.keystore.password-
ssl.keystore.typeJKS
ssl.truststore.location-
ssl.truststore.password-
ssl.truststore.typeJKS

                                                                 

mod-source-record-storage

System property that can be adjustedDefault value
srs.kafka.ParsedMarcChunkConsumer.instancesNumber1
srs.kafka.DataImportConsumer.instancesNumber1
srs.kafka.ParsedRecordChunksKafkaHandler.maxDistributionNum100
srs.kafka.DataImportConsumer.loadLimit5
srs.kafka.DataImportConsumerVerticle.maxDistributionNum100
srs.kafka.ParsedMarcChunkConsumer.loadLimit5
security.protocolPLAINTEXT
ssl.protocolTLSv1.2
ssl.key.password-
ssl.keystore.location-
ssl.keystore.password-
ssl.keystore.type-
ssl.truststore.locationJKS
ssl.truststore.password-
ssl.truststore.typeJKS

   

mod-inventory

System property that can be adjustedDefault value
kafkacache.topic.number.partitions1
kafkacache.topic.replication.factor1
kafkacache.log.retention.ms18000000
kafkacache.topicevents_cache
inventory.kafka.DataImportConsumerVerticle.instancesNumber5
inventory.kafka.MarcBibInstanceHridSetConsumerVerticle.instancesNumber5
inventory.kafka.DataImportConsumer.loadLimit 5
inventory.kafka.DataImportConsumerVerticle.maxDistributionNumber100
inventory.kafka.MarcBibInstanceHridSetConsumer.loadLimit5
security.protocolPLAINTEXT
ssl.protocolTLSv1.2
ssl.key.password-
ssl.keystore.location-
ssl.keystore.password-
ssl.keystore.typeJKS
ssl.truststore.location-
ssl.truststore.password-
ssl.truststore.typeJKS

                                                                                 

mod-invoice

System property that can be adjustedDefault value
mod.invoice.kafka.DataImportConsumerVerticle.instancesNumber1
mod.invoice.kafka.DataImportConsumer.loadLimit5
mod.invoice.kafka.DataImportConsumerVerticle.maxDistributionNumber100
dataimport.consumer.verticle.mandatory

false 

should be set to true in order to fail the module at start-up if data import Kafka consumer creation failed

security.protocolPLAINTEXT
ssl.protocol TLSv1.2
ssl.key.password-
ssl.keystore.location-
ssl.keystore.password-
ssl.keystore.typeJKS
ssl.truststore.location-
ssl.truststore.password-
ssl.truststore.typeJKS


Most settings can be left to default values and adjusted if needed. Based on the PTF test results, general recommendations to change the default values are the following:

  • Kafka (general recommendations for MSK):
    • auto.create.topics.enable = true
    • log.retention.minutes = 70-300
    • Broker’s disk space = 300 GB
    • 4 brokers, replication factor = 3, DI topics partition = 1
    • Version 2.7 is 30% faster than version 1.6
  • mod-inventory:
    • inventory.kafka.DataImportConsumerVerticle.instancesNumber=10
    • inventory.kafka.MarcBibInstanceHridSetConsumerVerticle.instancesNumber=10
    • kafka.consumer.max.poll.records=10
    • Memory: 2 GB


If topics are created manually, make sure topics for all data import event types. See the list of event types. Topics in Kafka should have name built from different pieces: ENV, nameSpace, tenant, eventType. Data import related event types will always have the DI prefix. Currently "Default" nameSpace is hardcoded for all the topics.

If auto.create.topics.enable=true setting is set for MSK topics will be created automatically. Please note that in such case the first data import job run after the set up will take longer to complete. 

Troubleshooting for System Administrators

  • Kill the job that appears to be stuck (click the trash can in the right corner and wait for 10 sec)
  • Stop modules involved in Data import process (mod-data-import, mod-source-record-manager, mod-source-record-storage, mod-inventory, mod-invoice)
  • Delete topics in Kafka related to data import (such topics follow the pattern "ENV.namespace.tenantId.DI_eventType"). Note that all the topics related to data import has DI prefix for the event type name. This will delete all the records that were sent to Kafka but wasn't delivered to the consumer.
  • Applicable only if auto.create.topics.enable=true is not set - Recreate topics that were deleted (OR skip the previous step and clear the records from the topics - to do so set retention to 1 ms and wait for a couple of minutes, then set normal retention time)
  • Restart modules involved in data import process (mod-data-import, mod-source-record-manager, mod-source-record-storage, mod-inventory, mod-invoice). In case auto.create.topics.enable=true is set all the necessary topics will be created automatically.
  • Run data import job to make sure it is working


  • No labels

5 Comments

  1. Oleksii Kuzminov  and Kateryna Senchenko - Is the "Troubleshooting" section also the directions for how to "restart" the system so that Data Import will begin working again?


      1. When troubleshooting - if a job is killed and associated messages deleted – is it now partially imported with potential issues with associated jobs record state
        Leave - if 

        1. Mike Gorrell Carole Godfrey Kateryna SenchenkoLet's decide if we maybe want to break out troubleshooting/triage as a separate page, or keep it here. Either is fine with me. I'll also post a note to a couple Slack channels and ask folks to review and comment

  2. These instructions all seem to be centered around using cloud services, like MSK (Managed Streaming for Apache Kafka).  Any idea as to how this would translate to an on-premise install (using kafka containers)?