SPIKE: [MODINREACH-78] Record Contribution: Analyze domain event pattern implementation in mod-inventory-storage and mod-search

MODINREACH-78 - Getting issue details... STATUS

Goal and requirements

Inventory storage module (mod-inventory-storage) uses event messaging approach to notify other interested parties (modules) about any changes done to inventory/item records. Those events are called "Domain events". Among know event consumers are mod-search and mod-remote-storage. 

This type of triggering some logic upon inventory data change can fit the requirements of propagating inventory/item data to Inn-Reach central server quite well. So the goal of the spike is to study the inventory events publishing/processing and its potential application for Inn-Reach Record Contribution. The key points of the study are:

  • understand inventory events
  • see how the flow is implemented in code
  • what the main points are
  • whether this code can be re-used in mod-inn-reach
  • what hints for Kafka client are recommended

Inventory storage domain events overview

Inventory domain events can be divided into 2 parts:

  1. basic events
    1. record changes that include CREATE/UPDATE/DELETE operations
  2. special case events
    1. removing of all records
    2. instance re-indexing

Depending on record type, instance or item or holdings, an event is sent to a special Kafka topic:

  • inventory.instance - for instances
  • inventory.item - for items
  • inventory.holdings-record - for holdings records

There are two message headers additionally populated for each event:

  • X-Okapi-Url and
  • X-Okapi-Tenant

The headers are set from the initial request to inventory storage module.

All existing inventory domain types are defined in DomainEventType enum

Domain event content and respective factory methods per each type can be found in DomainEvent class

Basic events structure

A change to inventory record triggers an event with the following payload:

Single record event
{
  "old": {...}, 
  "new": {...},
  "type": "<change type>",
  "tenant": "<tenant>"
}

Change type can be one of the following:

  • CREATE

  • UPDATE
  • DELETE

"old" and "new" elements contain record in the state it was before and after the change. Those elements might have empty value for some event type. The table below shows presence or absence of the elements depending on the event type:

Event typeoldnew
CREATE(minus)(plus)
UPDATE(plus)(plus)
DELETE(plus)(minus)

Partition key is always an instance id for basic events.

Special case events structure

1. removing of all records

Event to notify that all records of particular type (instance/item/holdings) have been deleted has the following structure:

DELETE_ALL event
{
  "type": "DELETE_ALL",
  "tenant": "<tenant>"
}

Partition key for this type of event is a fake instance id equal to "00000000-0000-0000-0000-000000000000"

2. instance re-indexing

Special type of events can be triggered when the whole instance inventory has to be iterated and for each existing instance record a separate event with type "REINDEX" posted into inventory.instance topic.

Event structure is as the following:

REINDEX event
{
  "type": "REINDEX",
  "tenant": "<tenant>"
}

Partition key is instance id, like for the most of other types

Triggering events in mod-inventory-storage

For basic events the flow of issuing events is pretty similar, regarding whether it's CREATE, UPDATE or DELETE event. It's also similar for different record types.

The following diagram shows Instance record updating sequence which includes UPDATE event publishing to Instance topic in Kafka.

  • Event publishing begins when an instance is updated in DB by InstanceService.
  • The service requests InstanceDomainEventPublisher to publish UPDATE event for the instance.
  • The publisher retrieves the latest version of instance and converts both new version and old version to UPDATE event
  • Once the event is ready CommonDomainEventPublisher can post it into dedicated topic in Kafka, with partition = instance id

Existing special case events - "Delete All" and "Re-index" - are very different by their origins and issuing flow though in fact they are applied to the whole set of records.

"Delete All" event is more close to basic events. It can be triggered via API method (InstanceStorageAPI.deleteInstanceStorageInstances(), in case of Instances) and eventually published into record type specific topic. During the flow all records of particular type are removed from DB.

As opposite, "Re-index" events are multiple events caused by a single API method call – ReindexInstanceAPI.postInstanceStorageReindex() – which is only applicable to Instance records. Re-indexing involves following steps:

  • new re-index job created and stored in DB by ReindexService. Status of the job is "In progress" and number of published event = 0
  • job runner (ReindexJobRunner) starts re-indexing in the separate thread with the help of Vert.x WorkerExecutor
  • worker queries instance ids of all instance records from DB and fetches them as a stream (see ReindexJobRunner.streamInstanceIds() method)
  • CommonDomainEventPublisher gets the stream and for each instance id:
    • creates "Re-index" events
    • posts the event into Instance topic in Kafka
    • increments the number of published events and saves to DB
  • once all instance ids processed ReindexJobRunner updates job's status to "Ids published" and stores the final number of records published

The event processing on example of mod-search

There are a couple of modules that consumes Inventory domain events. One of them is mod-search. The module subscribes to all Inventory topics and consumes basic and special case events. It's interested in Instance record updates to propagate them to ElasticSearch where the records can be indexed to support advanced searching.

The module responds to ongoing Instance record changes so that the indexes are always up to date. But it also can create or re-create the whole index and this is the case when Instance Re-indexing API of mod-inventory-storage comes in handy. The following diagram provides more details regarding re-indexing initiated from mod-search.

The module exposes an endpoint to initiate instance re-indexing: /search/index/inventory/reindex. When one calls this endpoint:

  • IndexController receives a request and delegates it to IndexService
  • IndexService checks if existing indices should be removed and if yes sends a request to ElasticSearch to drop them. After that it calls mod-inventory-storage to start re-indexing
  • Once re-indexing request is received by mod-inventory-storage, it transferred to ReindexService which controls the process and performs the necessary steps (see the previous section for more details). Briefly the process
    • gets all instance ids
    • transforms them to REINDEX events
    • publishes the events into inventory.instance topic in Kafka

On the other side, mod-search has a dedicated Kafka handler (KafkaMessageListener) which listens to Inventory events. When events are available they are consumed from the topic to decide which operation has to be done with index. If it's re-indexing, the handler extract instance ids from each event's key. Then for the received list of instance ids the handler requests IndexService to perform the required operation on indices. To do this IndexService.indexResourcesById() method:

  • firstly, if Instances have to be updated, fetches Instances by their ids from /inventory-view/instances endpoint (mod-inventory-storage)
  • then prepares instances to be indexed and removed
  • calls IndexRepository which sends instance requests to ElasticSearch

This is how the whole flow works in details. The main points are:

  1. mod-search has a mechanism to start re-indexing process: dedicated API endpoint
  2. mod-inventory-storage is able to fetch all existing instances, generate and publish to Kafka specific REINDEX events per each instance record
  3. mod-search as a consumer of events can read them from the topic in batches (the size is property configurable) asynchronously
  4. each batch is processed independently with retry mechanism (see FolioMessageBatchProcessor) which increases fault tolerance
  5. event handling is an internal process not associated with any regular user, thus mod-search has to maintain its own system user (see SystemUserService, PrepareSystemUserService)

Inn-reach Contribution process on top of domain events. Upcoming work.

In general domain event model provided by mod-inventory-storage can be re-used to build Contribution process (initial and on-going).

Since it was decided to concentrate on initial process first, let's formulate the nearest tasks that are more or less clear to this moment to start implementing the initial process:

  • make changes in re-indexing job in mod-inventory-storage
  • introduce Kafka client to accept Instance events from mod-inventory-storage
  • system user management
  • endpoint to start initial contribution job
  • job execution status tracking and basic statistics (data model + CRUD)