Context

Some FOLIO architectural proposals (in particular, FOLIO Cross-Application Data Sync Solution, Acquisition Event Log) are based on the Domain Event approach, in which, for any actions with data in one part of the platform, a corresponding event is generated and published. In general, this approach has been approved by the FOLIO Tech Council.

However, one of the challenges of this Domain Event approach is to ensure the atomicity of two actions that are different in nature - changing an object in the database and publishing an event about this in the notification channel.

To solve the described challenge, the Transactional Outbox approach is proposed. With this pattern, the module providing a domain event will obtain a guarantee that the domain event has been recorded in the event queue before committing its transaction. This will ensure data integrity and properly transfers the responsibility for data integrity to the event queue.

Additional links:

This document is an attempt to describe the vision for a concrete implementation of this approach on FOLIO using the Acquisition Event Log as an example.

Implementation Detail

  • Add to mod-orders-storage the Timer Interface https://github.com/folio-org/okapi/blob/master/doc/guide.md#timer-interface. This is necessary so that OkApi calls the necessary logic in mod-orders-storage with a given frequency. Usage examples are in mod-users, mod-licenses, mod-data-export, mod-circulation-storage
  • Implement a listener code in mod-orders-storage for such a Timer API call from OkApi
  • Create a internal_lock table with one row to manage concurrent threads through the mechanism of locks
  • When a call comes from OkApi, "do whatever need to do to process the outbox table" - one needs to execute select for update query on internal_lock. If failed, do nothing. If succeeded, continue with the main logic of reading events from the outbox_event_log table and sending them to Kafka. Repeat until outbox_event_log table has no more events. Upon completion - make a commit / rollback on internal_lock to release a lock.

Below is the sequencing this steps in a form of diagram.

 Plantuml for the diagram above...

@startuml Implementing the Transactional Outbox - Timer API driven

title Implementing the Transactional Outbox pattern - Timer API driven

skinparam sequence {
    ParticipantBorderColor Black
    ParticipantBackgroundColor White
    ParticipantFontColor Black
}

participant "OkApi\n" as OkApi
participant "mod-orders-storage\nMessage Relay logic" as MOS
participant "Database\ntable for locks" as OTB
participant "Database\noutbox table" as OT
participant "Kafka\n" as Kafka

OkApi -> MOS ++ : call Timer API
MOS --> OkApi : 200 OK

group do whatever need to do to process the outbox table
MOS -> OTB ++ : try to acquire lock via select for update
alt
    OTB --x MOS : lock failed
end

return lock succeeded

loop while there are new events
    MOS -> OT ++ : query table
    return N new events
    MOS -> MOS : transform events to messages
    MOS -> Kafka ++ : send messages
    alt
        Kafka --x MOS : failed
        MOS -> MOS : exit loop
    end
    return succeeded
    MOS -> OT ++ : remove sent events from table
    return succeeded
end

MOS -> OTB ++ : release lock
return succeeded

end

@enduml

The above sequence implements the required transactional outbox functionality, but it has some disadvantages. In particular,

  • it requires a regular call to the _timer API from OkApi (moreover, the faster the information must pass through the transactional outbox and get into Kafka, the more often there should be API calls, which means an increase in network load at the API level,
  • when processing each call, the module performs at least 3 calls to the database (to create a lock, poll the outbox table and release the lock) which again means an increase in network load and on the database.

To mitigate the described shortcomings, the following approach is proposed: upon successful insertion of a new event into the outbox_table, perform an asynchronous call to the "do whatever need to do to process the outbox table" logic.

 Plantuml for the diagram above...

@startuml Implementing the Transactional Outbox pattern - Async call driven

title Implementing the Transactional Outbox pattern - Async call driven

skinparam sequence {
    ParticipantBorderColor Black
    ParticipantBackgroundColor White
    ParticipantFontColor Black
}

participant "mod-orders-storage\nmain logic" as mos
participant "Database\norders and outbox tables" as dbo
participant "mod-orders-storage\nMessage Relay logic" as mr

-> mos: make a change
group RDBMS transaction
    mos -> dbo : start trx
    activate dbo
    mos -> dbo : make a change\nin orders data
    mos -> dbo : save an event to Outbox table
    mos -> dbo: commit trx
    deactivate dbo
    alt error while saving an event
        mos -> dbo : rollback trx
    end
end
mos -> mr: processOutbox (async call)
<- mos: done
mr -> mr: "do whatever need to do\nto process the outbox table"

@enduml

Key points to implement transactional outbox pattern

1. Create a new table to save outbox object

CREATE TABLE IF NOT EXISTS outbox_event_log (
  event_id uuid NOT NULL PRIMARY KEY,
  event_date timestamp NOT NULL,
  entity_type text NOT NULL,
  action text NOT NULL,
  payload jsonb
);

2. Create DAO for CRUD operation with outbox_event_log table - https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/dao/audit/AuditOutboxEventsLogRepository.java

3. Create a new table internal_lock to be used as lock. This is necessary to avoid (minimize) the possibility of sending duplicates to Kafka and to preserve the original order of events.

CREATE TABLE IF NOT EXISTS internal_lock (
  lock_name text NOT NULL PRIMARY KEY
);
INSERT INTO internal_lock(lock_name) VALUES ('audit_outbox') ON CONFLICT DO NOTHING;

4. Create a repository to work with the internal_lock table. This table uses SELECT FOR UPDATE construction for locking - https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/dao/InternalLockRepository.java

5. Leverage folio-kafka-wrapper and create Kafka producer for sending audit events - https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/event/service/AuditEventProducer.java

6. Create AuditOutboxService to fetch data from outbox_event_log and send events to Kafka - https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/event/service/AuditOutboxService.java#L56. This implementation does the following:

  • starts new transaction,
  • acquires DB lock using internal_lock table,
  • grabs outbox objects,
  • constructs Kafka events from these objects and sends them to Kafka,
  • deletes processed outbox events in batch by event ids,
  • commits transaction,
  • releases DB lock.

7. Use RMB .withTrans(..) implementation that automatically commits or rollbacks transaction if future fails - https://github.com/folio-org/raml-module-builder/blob/b35.0/domain-models-runtime/src/main/java/org/folio/rest/persist/PostgresClient.java#L3754

All methods that do creating or editing of entities should add this step to save outbox event in the same DB transaction. This flow does the following:

  • starts new transaction,
  • creates or edits business entity,
  • saves audit outbox event with full entity snapshot,
  • commits transaction,
  • subscribes to .onCompleted of .withTrans(..) and after transaction finishes, invokes processing of audit outbox logs, see #6 step.

Examples of usages:
https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/rest/impl/PurchaseOrdersAPI.java#L80 
https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/services/lines/PoLinesService.java#L107

8. Add a timer interface that will be called every 30 minutes and handle existing outbox audit events. Adding the timer interface is needed to be safe during failures, such as when a transaction is committed but the module was destroyed during the call to .onComplete(..) and there is no other events from the user to receive all remaining audit events after the module is restarted. In this case, the timer call will process the remaining events that have not yet been handled. This timer interface calls the AuditOutboxService, see step #6.

https://github.com/folio-org/mod-orders-storage/blob/master/descriptors/ModuleDescriptor-template.json#L544
https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/rest/impl/AuditOutboxAPI.java

Recommended Tests

How does Postgres handle a sudden loss of connection between an application and a database when using the select for update construct?

Assumption: if the application executed a select for update query and locked some row in the table, then in case of incorrect closing of the connection between this application and the database (i.e. without an explicit commit or rollback), the imposed lock will be released by the database engine itself.

Test: connect to a Postgres database in a test application, execute select for update on the internal_lock table to block it, break the connection without an explicit commit / rollback (for example, stop the instance, or terminate the execution of the thread, or break the connection using some kind of test proxy) and check if the row in internal_lock remains blocked or not, can another test application execute select for update on the same row?

What is the expected performance of the proposed implementation of the Transactional Outbox pattern?

Assumption: performance, measured by the number of events that can be transferred from the table to Kafka per unit of time, is acceptable.

Important: in fact, the acceptance criterion is quite vague. Are 10 events per second acceptable? 100? 1000 or more? Probably the expected performance depends on the usage scenario. Therefore, the task of the test is to measure the performance of the proposed implementation of the Transactional Outbox pattern and formulate it as a baseline metric.

Test: Prepare N events in outbox_event_log (for example, 100 thousand). Enable  the main logic of reading events from the outbox_event_log and sending them to Kafka. Measure how much time it will take to process all 100K events.