Current approach to working with transactions in RMB

To enable the work with a database in FOLIO project there is a custom solution implemented on top of the VERT.X Postgres Client. The main feature of working with RMB and VERT.X is the usage of the asynchronous approach. Sequential execution of operations requires handling the completion of each operation and occurring errors. Each subsequent operation can be executed only if the previous one is succeeded. In order to maintain data consistency there is a need to execute the operations in transaction and be able to rollback the changes in case an error occurred. At the moment, this possibility is implemented as follows:

  1. *A database connection object is created and the SQL command “BEGIN” is executed
  2. The connection object is passed as a parameter to the Postgres client's methods and, accordingly, all commands are executed within a single connection
  3. All errors are handled and Futures are succeeded
  4. If an error occurs, rollback must be explicitly called
  5. At the end of  the transaction, the endTransaction() method must be explicitly called
  6. *After the transaction is ended, the SQL command "COMMIT" is executed 

The First and the last operations RMB PostgresClient does automatically

Example method with two operation in scope of one transaction

public Future<Void> example() {
  Future future = Future.future();
  PostgresClient client = PostgresClient.getInstance(vertx, tenantId);
  // start tx
  client.startTx(tx -> {
    // first operation
    client.get(tx, "upload_definition", UploadDefinition.class, new Criterion(), true, false, getHandler -> {
      if (getHandler.succeeded()) {
        // second operation
        client.save(tx, "upload_definition", UUID.randomUUID().toString(), getHandler.result(), saveHandler -> {
          if (saveHandler.succeeded()) {
            client.endTx(tx, endHandler -> {
              if (endHandler.succeeded()) {
                future.succeeded();
              } else {
                client.rollbackTx(tx, rollbackHandler -> {
                  future.fail(getHandler.cause());
                });
              }
            });
          } else {
            client.rollbackTx(tx, rollbackHandler -> {
              future.fail(getHandler.cause());
            });
          }
        });
      } else {
        client.rollbackTx(tx, rollbackHandler -> {
          future.fail(getHandler.cause());
        });
      }
    });
  });
  return future;
}


Locking tables in a database

When developing a slightly more complex business logic, the difficulty arises in the fact that certain operations may take some time and, accordingly, at this moment there is a possibility that it will be necessary to process another such request. Without locking a record in the database, there is a high probability of “lost changes” when the second request overwrites the changes made by the first one. Since VERTX.X is asynchronous, any locks and synchronous code executions are unacceptable, and the Persistence Context is absent. The most obvious solution is to use locks on the record in the database using the “SELECT FOR UPDATE” statement. Accordingly, to perform a safe update of the record in the database, you should:

  1. Create Transaction Object
  2. Select data from database for update (using “SELECT FOR UPDATE”)
  3. Do some kind of business logic operation
  4. Update entity in database
  5. Complete transaction

Such scenario was needed at mod-data-import for file upload functionality.

Steps for file upload example:

Upload Definition mutator interface for callback usage

/**
 * Functional interface for change UploadDefinition in blocking update statement
 */
@FunctionalInterface
public interface UploadDefinitionMutator {
  /**
   * @param definition - Loaded from DB UploadDefinition
   * @return - changed Upload Definition ready for save into database
   */
  Future<UploadDefinition> mutate(UploadDefinition definition);
}

Method for update entity with record locking

public Future<UploadDefinition> updateBlocking(String uploadDefinitionId, UploadDefinitionMutator mutator) {
  Future<UploadDefinition> future = Future.future();
  String rollbackMessage = "Rollback transaction. Error during upload definition update. uploadDefinitionId" + uploadDefinitionId;
  pgClient.startTx(tx -> {
    try {
      StringBuilder selectUploadDefinitionQuery = new StringBuilder("SELECT jsonb FROM ")
        .append(schema)
        .append(".")
        .append(UPLOAD_DEFINITION_TABLE)
        .append(" WHERE _id ='")
        .append(uploadDefinitionId).append("' LIMIT 1 FOR UPDATE;");
      pgClient.execute(tx, selectUploadDefinitionQuery.toString(), selectResult -> {
        if (selectResult.failed() || selectResult.result().getUpdated() != 1) {
          pgClient.rollbackTx(tx, r -> {
            logger.error(rollbackMessage, selectResult.cause());
            future.fail(new NotFoundException(rollbackMessage));
          });
        } else {
          Criteria idCrit = new Criteria();
          idCrit.addField(UPLOAD_DEFINITION_ID_FIELD);
          idCrit.setOperation("=");
          idCrit.setValue(uploadDefinitionId);
          pgClient.get(tx, UPLOAD_DEFINITION_TABLE, UploadDefinition.class, new Criterion(idCrit), false, true, uploadDefResult -> {
            if (uploadDefResult.failed()
              || uploadDefResult.result() == null
              || uploadDefResult.result().getResultInfo() == null
              || uploadDefResult.result().getResultInfo().getTotalRecords() < 1) {
              pgClient.rollbackTx(tx, r -> {
                logger.error(rollbackMessage);
                future.fail(new NotFoundException(rollbackMessage));
              });
            } else {
              try {
                UploadDefinition definition = uploadDefResult.result().getResults().get(0);
                mutator.mutate(definition)
                  .setHandler(onMutate -> {
                    if (onMutate.succeeded()) {
                      try {
                        CQLWrapper filter = new CQLWrapper(new CQL2PgJSON(UPLOAD_DEFINITION_TABLE + ".jsonb"), "id==" + definition.getId());
                        pgClient.update(tx, UPLOAD_DEFINITION_TABLE, onMutate.result(), filter, true, updateHandler -> {
                          if (updateHandler.succeeded() && updateHandler.result().getUpdated() == 1) {
                            pgClient.endTx(tx, endTx -> {
                              if (endTx.succeeded()) {
                                future.complete(definition);
                              } else {
                                logger.error(rollbackMessage);
                                future.fail("Error during updating UploadDefinition with id: " + uploadDefinitionId);
                              }
                            });
                          } else {
                            pgClient.rollbackTx(tx, r -> {
                              logger.error(rollbackMessage, updateHandler.cause());
                              future.fail(updateHandler.cause());
                            });
                          }
                        });
                      } catch (Exception e) {
                        pgClient.rollbackTx(tx, r -> {
                          logger.error(rollbackMessage, e);
                          future.fail(e);
                        });
                      }
                    } else {
                      pgClient.rollbackTx(tx, r -> {
                        logger.error(rollbackMessage, onMutate.cause());
                        future.fail(onMutate.cause());
                      });
                    }
                  });
              } catch (Exception e) {
                pgClient.rollbackTx(tx, r -> {
                  logger.error(rollbackMessage, e);
                  future.fail(e);
                });
              }
            }
          });
        }
      });
    } catch (Exception e) {
      pgClient.rollbackTx(tx, r -> {
        logger.error(rollbackMessage, e);
        future.fail(e);
      });
    }
  });
  return future;
}

This UploadDefinitionDaoImpl.updateBlocking using Future.compose and Future.setHandler: https://github.com/julianladisch/mod-data-import/blob/future-compose/src/main/java/org/folio/dao/UploadDefinitionDaoImpl.java#L50-L112

Method with file upload logic

@Override
public Future<UploadDefinition> uploadFile(String fileId, String uploadDefinitionId, InputStream data, OkapiConnectionParams params) {
  return uploadDefinitionService.updateBlocking(uploadDefinitionId, uploadDefinition -> {
    Future<UploadDefinition> future = Future.future();
    Optional<FileDefinition> optionalFileDefinition = uploadDefinition.getFileDefinitions().stream().filter(fileFilter -> fileFilter.getId().equals(fileId))
      .findFirst();
    if (optionalFileDefinition.isPresent()) {
      FileDefinition fileDefinition = optionalFileDefinition.get();
      FileStorageServiceBuilder
        .build(vertx, tenantId, params)
        .map(service -> service.saveFile(data, fileDefinition, params)
          .setHandler(onFileSave -> {
            if (onFileSave.succeeded()) {
              uploadDefinition.setFileDefinitions(replaceFile(uploadDefinition.getFileDefinitions(), onFileSave.result()));
              uploadDefinition.setStatus(uploadDefinition.getFileDefinitions().stream().allMatch(FileDefinition::getLoaded)
                ? UploadDefinition.Status.LOADED
                : UploadDefinition.Status.IN_PROGRESS);
              future.complete(uploadDefinition);
            } else {
              future.fail("Error during file save");
            }
          }));
    } else {
      future.fail("FileDefinition not found. FileDefinition ID: " + fileId);
    }
    return future;
  });
}

Problems of the current approach

Using the asynchronous approach when working with a database, has a number of limitations and difficulties in when it comes to writing sequential or transactional business logic. Such as following:

Possible solutions