Skip to end of metadata
Go to start of metadata

Since R1 2021 Iris release Data Import application is using KAFKA as a transport

As of the Iris release, Data import no longer goes through PubSub. All modules involved in data import (mod-data-import, mod-source-record-manager, mod-source-record-storage, mod-inventory, mod-invoice) are communicating via Kafka directly. Therefore, to enable data import Kafka should be set up properly and all the necessary parameters should be set for the modules.

Below are the example Kafka configuration settings that were used for testing import of 50k records on Performance Task Force and Bugfest envs that are using MSK, running two containers for each module (except mod-data-import that runs in one container) on m5.large  ec2 instances 

auto.create.topics.enable=true (necessary to enable automatic topic creation)
default.replication.factor=2 (make sure this setting is not higher than number of brokers, e.g. for 2 brokers, replication factor should be set to 2)
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
log.retention.minutes=300 (on PTF and Bugfest envs 70 minutes was enough to import 50k records with
Default job profile for importing MARC bibs and creating Instances)

Kafka disc space was set to 500 Gb (disk space was not highly utilized, it depends on log retention and load).

Modules memory configs in Mb:

Module CPUHard/Soft memory
mod-data-import 128

864/536

mod-source-record-manager 5121440/1024
mod-source-record-storage 2561440/1024
mod-inventory 2561872/1440
mod-inventory-storage2561440/1024
mod-invoice2561440/1024

*Note If scaling on ECS tasks enabled - CPU allocation can be decreased 

Additional recommendations for memory allocation:

  • Specify heap memory limits by the -XX:MaxRAMPercentage parameter or by -Xmx parameter (but do not use both parameters at the same time);
  • Metaspace size (if specified) - we do not recommend setting more than 100-150 mb;
  • Metaspace size + Heap size should be less than allocated limits: at least 100-200 mb should be reserved for the functioning of the docker container.  It means that: Metaspace size + Heap size + (100..200 mb) = allocated limits.

Make sure that auto.create.topics.enable=true setting is set for MSK. This will allow topics to be created automatically. Otherwise topics used for DI purposes will need to be created manually (see below the list of data import topics). Number of partitions and replication factor should be configured on Kafka side. Make sure that replication factor is not higher than the number of brokers

There are several properties that should be set for modules that interact with Kafka: KAFKA_HOST, KAFKA_PORT, ENV(unique env ID).

List of modules, for which mentioned parameters should be set:

  • mod-data-import
  • mod-source-record-manager
  • mod-source-record-storage
  • mod-inventory
  • mod-invoice

KAFKA_HOST and KAFKA_PORT values should also be specified for mod-inventory-storage. mod-inventory-storage also requires REPLICATION_FACTOR value to be set https://github.com/folio-org/mod-inventory-storage/blob/master/README.MD#kafka.

After setup, it is good to check logs in all related modules for errors. Data import consumers and producers work in separate verticles that are set up in RMB's InitAPI for each module. That would be the first place to check deploy/install logs.

DB_MAXPOOLSIZE should be set not less than 15(we recommend to set as 15) for modules mod-source-record-manager and mod-source-record-storage.

There are other properties that should be set for data import modules:

mod-data-import

Properties related to file upload that should be set at mod-configuration are described in the doc  https://github.com/folio-org/mod-data-import#module-properties-to-set-up-at-mod-configuration

System property that can be adjustedDefault value
file.processing.marc.raw.buffer.chunk.size

50

file.processing.marc.json.buffer.chunk.size50
file.processing.marc.xml.buffer.chunk.size10
file.processing.edifact.buffer.chunk.size10

For releases prior to Kiwi it is recommended to set the file.processing.buffer.chunk.size property to in order to prevent mod-source-record-storage from crashing with OOM during an Update import of 5,000 records. The property can be set to to allow Update import of 10,000 records.

mod-source-record-manager 

System property that can be adjustedDefault value
srm.kafka.RawMarcChunkConsumer.instancesNumber1
srm.kafka.StoredMarcChunkConsumer.instancesNumber1
srm.kafka.DataImportConsumersVerticle.instancesNumber1
srm.kafka.DataImportJournalConsumersVerticle.instancesNumber1
srm.kafka.RawChunksKafkaHandler.maxDistributionNum100
srm.kafka.CreatedRecordsKafkaHandler.maxDistributionNum100
srm.kafka.DataImportConsumer.loadLimit5
security.protocolPLAINTEXT
ssl.protocolTLSv1.2
ssl.key.password-
ssl.keystore.location-
ssl.keystore.password-
ssl.keystore.typeJKS
ssl.truststore.location-
ssl.truststore.password-
ssl.truststore.typeJKS

                                                                 

mod-source-record-storage

System property that can be adjustedDefault value
srs.kafka.ParsedMarcChunkConsumer.instancesNumber1
srs.kafka.DataImportConsumer.instancesNumber1
srs.kafka.ParsedRecordChunksKafkaHandler.maxDistributionNum100
srs.kafka.DataImportConsumer.loadLimit5
srs.kafka.DataImportConsumerVerticle.maxDistributionNum100
srs.kafka.ParsedMarcChunkConsumer.loadLimit5
security.protocolPLAINTEXT
ssl.protocolTLSv1.2
ssl.key.password-
ssl.keystore.location-
ssl.keystore.password-
ssl.keystore.type-
ssl.truststore.locationJKS
ssl.truststore.password-
ssl.truststore.typeJKS

   

mod-inventory

System property that can be adjustedDefault value
inventory.kafka.DataImportConsumerVerticle.instancesNumber3
inventory.kafka.MarcBibInstanceHridSetConsumerVerticle.instancesNumber3
inventory.kafka.DataImportConsumer.loadLimit 5
inventory.kafka.DataImportConsumerVerticle.maxDistributionNumber100
inventory.kafka.MarcBibInstanceHridSetConsumer.loadLimit5
security.protocolPLAINTEXT
ssl.protocolTLSv1.2
ssl.key.password-
ssl.keystore.location-
ssl.keystore.password-
ssl.keystore.typeJKS
ssl.truststore.location-
ssl.truststore.password-
ssl.truststore.typeJKS

                                                                                 

mod-invoice

System property that can be adjustedDefault value
mod.invoice.kafka.DataImportConsumerVerticle.instancesNumber1
mod.invoice.kafka.DataImportConsumer.loadLimit5
mod.invoice.kafka.DataImportConsumerVerticle.maxDistributionNumber100
dataimport.consumer.verticle.mandatory

false 

should be set to true in order to fail the module at start-up if data import Kafka consumer creation failed

security.protocolPLAINTEXT
ssl.protocol TLSv1.2
ssl.key.password-
ssl.keystore.location-
ssl.keystore.password-
ssl.keystore.typeJKS
ssl.truststore.location-
ssl.truststore.password-
ssl.truststore.typeJKS


Most settings can be left to default values and adjusted if needed. Based on the PTF test results, general recommendations to change the default values are the following:

  • Kafka (general recommendations for MSK):
    • auto.create.topics.enable = true
    • log.retention.minutes = 70-300
    • Broker’s disk space = 300 GB
    • 4 brokers, replication factor = 3, DI topics partition = 2
    • Version 2.7 is 30% faster than version 1.6
  • mod-inventory:
    • inventory.kafka.DataImportConsumerVerticle.instancesNumber=X (the default value is 3 and if number of kafka partitions greater that 3 and scaling not enabled - in this case we recommend to increase value of this property)
    • inventory.kafka.MarcBibInstanceHridSetConsumerVerticle.instancesNumber=X (the default value is 3 and if number of kafka partitions greater that 3 and scaling not enabled - in this case we recommend to increase value of this property)
    • kafka.consumer.max.poll.records=10
    • Memory: 2 GB

Additional recommendations for Kafka topics configuring

If topics are created manually, make sure topics for all data import event types. See the list of event types. Topics in Kafka should have name built from different pieces: ENV, nameSpace, tenant, eventType. Data import related event types will always have the DI prefix. Currently "Default" nameSpace is hardcoded for all the topics.

If auto.create.topics.enable=true setting is set for MSK topics will be created automatically. Please note that in such case the first data import job run after the set up will take longer to complete. 

We strongly recommend to make topic's partitions count the same for all topics, especially for topic DI_RAW_RECORDS_CHUNK_READ and DI_COMPLETED, DI_ERROR, because it is the basis of proper work of Flow Control feature for load orchestrating.

Delete job executions with all related data

UI allows to delete multiple job executions from Landing page and View All page.
Data import marks jobs as deleted after user hits the Delete button. Queries to get/update job executions filter out records, marked as deleted.
The new scheduled job has been introduced to make hard deletes of these records.
By default it triggers each 24 hours and finds records marked as deleted completed not less than 2 days ago.

These params are configurable(applied for module mod-source-record-manager):

  • periodic.job.execution.permanent.delete.interval.ms - interval in milliseconds to trigger job for hard deletion.
    (By default it equals to 86400000 that is the same as 24 hours).
    Example of applying this property in JAVA_OPTS:  -Dperiodic.job.execution.permanent.delete.interval.ms=86400000
  •  job.execution.difference.number.of.days - number of days from job execution completed date to consider that job execution eligible for deletion.
    (By default it equals to 2 days).
    Example of applying this property in JAVA_OPTS: -Djob.execution.difference.number.of.days=2


This scheduled job deletes data from tables:

  • job_execution
  • job_execution_progress
  • job_execution_source_chunks
  • journal_records
  • job_monitoring

Troubleshooting for System Administrators

How to restart DI application

  • Kill the job that appears to be stuck (click the trash can in the right corner and wait for 10 sec)
  • Stop modules involved in Data import process (mod-data-import, mod-source-record-manager, mod-source-record-storage, mod-inventory, mod-invoice)
  • Delete topics in Kafka related to data import (such topics follow the pattern "ENV.namespace.tenantId.DI_eventType"). Note that all the topics related to data import has DI prefix for the event type name. This will delete all the records that were sent to Kafka but wasn't delivered to the consumer.
  • Applicable only if auto.create.topics.enable=true is not set - Recreate topics that were deleted (OR skip the previous step and clear the records from the topics - to do so set retention to 1 ms and wait for a couple of minutes, then set normal retention time)
  • Restart modules involved in data import process (mod-data-import, mod-source-record-manager, mod-source-record-storage, mod-inventory, mod-invoice). In case auto.create.topics.enable=true is set all the necessary topics will be created automatically.
  • Run data import job to make sure it is working

How to deal with DB schema migration issues

In case if system administrators manually invoke POST Tenant API to database and not specify 'module_from'  version to migrate - this can lead to some DB migration issues.

Exception that was reproducible on some environments when incorrectly using POST Tenant API endpoint to perform DB migration:

13:38:36.596 [vert.x-eventloop-thread-1] ERROR PostgresClient [55120eqId] ERROR: column jep1.jobexecutionid does not exist (42703)
io.vertx.pgclient.PgException: ERROR: column jep1.jobexecutionid does not exist (42703) 


POST Tenant API request leads to this issue. In this request 'module_from' param is missed, that is incorrect usage of this endpoint.

curl "http://localhost:8081/_/tenant" -H "X-Okapi-Tenant: diku" -H "Content-type: application/json" -XPOST -d'
{
  "module_to": "mod-source-record-manager-3.3.0",
  "parameters": [{
    "key": "loadSample",
    "value": "true"
  },{
    "key": "loadReference",
    "value": "true"
  }]
}'


Correct usage of POST Tenant API request. In this request we specifying both 'module_from' and 'module_to' parameters, that is the same as Okapi does when invoking this endpoint.

curl "http://localhost:8081/_/tenant" -H "X-Okapi-Tenant: diku" -H "Content-type: application/json" -XPOST -d'
{
  "module_from": "mod-source-record-manager-3.2.0",
  "module_to": "mod-source-record-manager-3.3.0",
  "parameters": [{
    "key": "loadSample",
    "value": "true"
  },{
    "key": "loadReference",
    "value": "true"
  }]
}'

  • No labels

5 Comments

  1. Oleksii Kuzminov  and Kateryna Senchenko - Is the "Troubleshooting" section also the directions for how to "restart" the system so that Data Import will begin working again?


      1. When troubleshooting - if a job is killed and associated messages deleted – is it now partially imported with potential issues with associated jobs record state
        Leave - if 

        1. Mike Gorrell Carole Godfrey Kateryna SenchenkoLet's decide if we maybe want to break out troubleshooting/triage as a separate page, or keep it here. Either is fine with me. I'll also post a note to a couple Slack channels and ask folks to review and comment

  2. These instructions all seem to be centered around using cloud services, like MSK (Managed Streaming for Apache Kafka).  Any idea as to how this would translate to an on-premise install (using kafka containers)?