Skip to main content

Data access APIs

Overview

Genesis data access APIs provide access to the data in a Genesis application. They are used by most of the server capabilities to enable an application to write, read and subscribe to data updates. You can use these APIs when you configure server capabilities, such as the Event Handler, which is used for core business logic. You can also inject the APIs into any application for use in custom modules and processes.

There are several versions of EntityDb API, which enable you to write to and query the data in your database. With the exception of RxDb, they are powered by type-safe entities for tables. These are DAOs generated at build time, based on the data model you have defined.

The versions of the entity EntityDb are:

APILanguagesDescription
SyncEntityDbKotlin + JavaA blocking API, simplest to use and helpful when performance isn't critical
AsyncEntityDbKotlin onlyAn asynchronous API powered by Kotlin coroutines
RxEntityDbJava onlyAn asynchronous API powered by RxJava
RxDbKotlin + JavaAn asynchronous API powered by RxJava, which does NOT use type-safe entities

The difference between blocking (synchronous) and asynchronous is that the code needs to wait on results to occur before moving on.

  • In blocking code, the thread will block until the database responds. This code is easier to write and think about logically.
  • In async code, the thread will not block: instead, it resumes after the database responds. This is more difficult to write and comprehend, but it can handle more requests concurrently.

All the APIs implement the same methods for reading and writing, but their return types differ and need handling differently in code.

Access control

User access control is not implemented in the API, so the database APIs are not aware of the user accessing them. User-facing capabilities such as query(Data Server), requestReply (Request Server) and eventHandler (Event Handler) all apply access control measures over their DB API interactions. When writing custom code that is writing or reading data for given users, you should apply the same before return read results, or attempting any writes.

Data types

Data types are essential when using the EntityDb APIs. The objects and classes encapsulate the data within your Genesis application and are generated at DAO build time.

Table entities

Table entities are type-safe classes generated by Genesis, which match the applications's tables. The generated entity name is based on the table name, but will be camel-case.

For example, TABLE_NAME becomes TableName.

These entities are available for use in all Entity APIs (except RxDb).

All table entities come with builders to help construct these objects.

  • In Kotlin, the builder works as a lambda in which the field values are set, and the object is built after the lambda call is completed.
  • In Java, the builder is a fluent interface, where fields are set and the object is built in a final build call.

Just before the object is built, the object is validated to make sure all the required fields are set.

The following example shows how the table Trade from the example data model is built:

val trade = Trade {
tradeId = 1
tradePrice = 2.1
direction = Direction.BUY
quantity = 500
tradeDate = DateTime.now()
}

When a table is audited, such as the example TRADE table above, the entity can be easily converted to its audited counterpart by calling the toAuditEntity function.

val tradeAudit = trade.toAuditEntity(
auditEventType = "trade modify"
auditEventDatetime = DateTime.now(),
auditEventText = "trade was modified in event",
auditEventUser = user,
)

View entities

View entities are type-safe classes generated by Genesis to match the viewsdefined in your application's data model. The generated entity name is based on the view name, but will be camel-case.

For example, VIEW_NAME becomes TableName.

These entities are available for use in all Entity APIs (except RxDb).

There are two types of view entity:

  • SingleCardinalityViewEntity : all joins are one-to-one
  • MultiCardinalityViewEntity : contains one or more joins, which are one-to-many

Index entities

Index entities are nested in table entities and view entities. The name will be based on the index name defined. The entity can be constructed by passing in the field values in order. The first field of the index must always be provided, and the others are optional.

byPrimaryKey() will return an entity for the primary key. Additionally, for each index, there will be a by...() call where ... is the index name.

A unique index entity will only be created when all fields of a unique index are supplied. In all other cases, a non-unique index entity will be created.

View entities inherit the indices of the root table of the view, so with reference to our example data model, the view will also have an index entity for the trade's primary key and indices:

  • ById
  • ByCounterpartyId
  • ByInstrumentId
  • ByDate

Database entities

DbEntity is the common interface implemented by table entities and view entities.

Database records

DbRecord is the most raw representation of a table. It enables you to build a record of a specified table using strings for table, field and index names. It is not type-safe, and is not a recommended method.

The objects cannot be used with entity Db APIs.

Entity Db API

All three entity DB APIs (SyncEntityDb, AsyncEntityDb, RxEntityDb) use the same types and implement the same API methods, with the exception of their return types.

Type convention

TypeMeaningExample
EA table or view entityTrade
TA table entityTrade
VA view entityTradeView
EntityIndex<E>An index of ETrade.ById
UniqueEntityIndex<E>A unique index of ETrade.ById
NonUniqueEntityIndex<E>A non unique index of ETrade.ByDate
EntityIndexReference<E>An index reference of ETrade.ById
UniqueEntityIndexReference<E>A unique index reference of ETrade.ById
NonUniqueEntityIndexReference<E>A non unique index reference of ETrade.ByDate
F<E>The full table /view name for ETRADE
Class<E>The class reference for ETrade.class
KClass<E>The Kotlin class reference for ETrade::class

Read operations

get

Get is a simple look-up on the database; it returns a single entity if a match is found, or no records if no match is found.

The following overloads exist for get; fields is a Set<String>.

  • get(E, EntityIndexReference<E>, fields) : E?
  • get(E, fields) : E?
  • get(E, EntityIndexReference<E>) : E?
  • get(UniqueEntityIndex<E>, fields) : E?
  • get(UniqueEntityIndex<E>) : E?
Syntax
// we can look up trades by passing in a unique index class:
val trade = db.get(Trade.byId("TRADE_1"))

// a trade object with the primary key set
val trade = db.get(trade)

// a trade object and a reference to unique index
val trade = db.get(trade, Trade.ByTypeId)

// or you can access the index class from the entity
val trade = db.get(trade.byTypeId())

getAll

This takes multiple unique index class instances and returns the entity type for the record. It takes a List<Pair<String, UniqueEntityIndex<E>>>, where the String is a unique reference to each request.

Overloads
  • getAll(requestDetails: Flow<Pair<String, UI<E>>): Map<String, E?>
  • getAll(requestDetails: List<Pair<String, UI<E>>): Map<String, E?>
val map = db.getAll(listOf("A" to Trade.byId("TRADE_A"), "B" to Trade.byId("TRADE_B")))

val recordA = map["A"]
val recordB = map["B"]

getAllAsList

This operation is similar to getAll, but takes a List<UniqueEntityIndex<E>>, and returns a List<E?>.

The results are returned in the order they were requested and will be null if no record was found. The result list is guaranteed to be the same count as the input.

Overloads
  • getAllAsList(Flow<UI<E>>): List<E?>
  • getAllAsList(List<UI<E>>): List<E?>
  • getAllAsList(vararg UI<E>): List<E?>
val list = db.getAllAsList(Trade.byId("TRADE_A"), Trade.byId("TRADE_B"))

val recordA = list[0]
val recordB = list[1]

getBulk

This operation creates a Flowable of the whole table.

The records will be sorted in ascending order by the index provided. If no index is provided, they will be sorted by the primary key.

There is also the getBulkFromEnd function, which returns records in descending order.

There are also a number of continuation operations, which return the whole table after the provided record. These methods are deprecated and should not be used going forwards.

Overloads
  • getBulk<E>(): Flow<E> (Kotlin only)
  • getBulk([Class<E> / KClass<E>]): Flow<E>
  • getBulk(UR<E>): Flow<E>
  • getBulk(UR<E>, fields): Flow<E>
  • getBulk(UR<E>, E, fields): Flow<E> (continuation) (Deprecated)
  • getBulkFromEnd(UR<E>): Flow<E>
  • getBulkFromEnd(UR<E>, E), E: Flow<E> (continuation) (Deprecated)
  • getBulkFromEnd(UR<E>, E, fields), E: Flow<E> (continuation) (Deprecated)
  • getBulkFromEnd(UR<E>, fields): Flow<E>
Syntax
// we can pass in Trade as a type parameter
val flow = db.getBulk<Trade>()
// we can pass in the TRADE object
val flow = db.getBulk(TRADE)
// or we can pass in an index reference
val flow = db.getBulk(Trade.ByTypeId)

getRange

getRange is an operation that can return multiple records from the database.

There are two types of getRange operation:

  • single-value range, e.g. trades with CURRENCY set to USD
  • interval range, e.g. all trades with TRADE_DATE between 01-Jul-2024 and 01-Aug-2024.

Unlike the get operation, the getRange operation can always return multiple records.

We pass index entities into getRange. When using an interval range, both index entities should be of the same type. in the example below, this is Trade.byDate.

// single value
val range = db.getRange(Trade.byCurrency("USD"))

// interval
val range = db.getRange(
Trade.byDate(myStartDate),
Trade.ByDate(myEndDate)
)

Table entities provide methods for building index entities:

// single value
val range = getRange(myUsdTrade.byCurrency())

// interval
val range = db.getRange(myStartTrade.byDate(), myEndTrade.byDate())
numKeyFields property

This enables users to query on part of the index. When using the getRange operation, you can tell the database how many fields in the index to use.

By default, all fields are used, but you can override this.

note

Order matters here; you can get a range on just field 1, on field 1 and field 2, but not on field 2 and field 1.

For example, if on the ORDER table, you have an index on COUNTERPARTY and INSTRUMENT_ID, you could:

  • perform a single-value getRange operation on COUNTERPARTY. For this, set numKeyFields to 1.
  • perform a single-value getRange operation on both COUNTERPARTY and INSTRUMENT. For this, set numKeyFields to 2.

More examples:

// we could look up all rows for both COUNTERPARTY and INSTRUMENT:
val range = db.getRange(myOrder.byCounterpartyAndInstrument())
// which would be equivalent to
val range = db.getRange(myOrder.byCounterpartyAndInstrument(), 2)
// and also
val range = db.getRange(Order.byCounterpartyAndInstrument("CPTY_A", "VOD"))

// or we could look pu by just COUNTERPARTY
val range = db.getRange(myOrder.byCounterpartyAndInstrument(), 1)
// which would be equivalent to
val range = db.getRange(Order.byCounterpartyAndInstrument("CPTY_A"))
info

numKeyFields is required in the following instances:

  • when passing in a table entity and an index reference
  • when passing in an index entity that refers to a unique index
Fields

When using a getRange operation, you can provide a Set<String> with the field names you want to select. This will only be used as a hint; non-nullable fields will always be returned from the database.

val range = getRange(
myUsdTrade.byCurrency(),
fields = setOf("PRICE", "QUANTITY")
)

Reversed range look-ups

In addition to getRange there is also the getRangeFromEnd method.

This works in the same way as getRange. However, rows are returned in the reverse order.

Parameters when using index entities
NameTypeRequiredMeaningDefault value
IndexEntityIndex<E>✔️, For single value rangesThe index entry to read
Start indexEntityIndex<E>✔️, For interval rangesThe index entry to read from
End indexEntityIndex<E>✔️, For interval rangesThe index entry to read to
numKeyFieldsIntThe number of key fields to take into accountAll fields in the index
fieldsSet<String>The names of the fields to returnreturn all fields

Write operations

Concepts and helper classes

The entity database will return type-safe results for write operations. There are four main write results, one for each type of write operation. Each of these operations is of type EntityWriteResult.

InsertResult has a single property record which is the inserted record. This includes any generated values.
DeleteResult has a single property record which is the record as it was in the database before it was deleted.
ModifyResult is slightly more complex. It has three properties:

  • record property, which is the record in the database after the modify operation
  • previous property, which is the record as it was before the modify operation
  • modifiedFields property, which holds a Set<String> of the fields that were changed in the modify

UpsertResult can be either an InsertResult or a ModifyResult.

All write operations have versions that take a single entity and versions that take multiple entities. Different entity types can be inserted in the same operation; however, the return type will be List<InsertResult<Entity>>. Also, modify operations only accept table entities.

Another important helper class is ModifyDetails. It holds details of data modification resulting from a modify or upsert. When performing modify and upsert operations with EntityDb, you must specify the index of the record you wish to perform the operation on. The EntityModifyDetails class enables you to identify the record and the fields to modify.

When writing a record to the database, typically all non-null properties should be set on the entity. An entity property becomes non-nullable if:

  • it has a default value
  • it is generated by the database, i.e. sequence or auto increment fields
  • the column is included in an index or is specifically declared not null in the schema
Default values

Properties with a default value will have the value set by default, unless set explicitly in the builder.

Generated properties

Generated properties are left in an indeterminate state if not set in the builder. When writing to the database, this indeterminate state is set in the return value. Trying to read the property while it is in this state results in an IllegalArgumentException.

Each generated property has only two read-only associated properties to access these properties in a safe manner.

  • is[FieldName]Generated boolean property
  • [fieldName]OrNull property

For example, where TRADE_ID field on TRADE table is generated:

trade.tradeId                   // will cause an exception if not initialized
trade.tradeIdOrNull // will return the tradeId if set, or else null
trade.isTradeIdInitialised // will return true if set
Columns in indices or are not null explicitly

Columns in indices or declared not null should always be set in a builder, unless it has a default value or is a generated column. In all other instances, a NullPointerException will be thrown when building the object.

insert

This inserts a new record into the database. The insert function takes a single table entity. The insertAll function takes multiple records, and has several overloads:

Overloads
  • insert(E): InsertResult<E>
  • insertAll(vararg E): List<InsertResult<E>>
  • insertAll(List<E>): List<InsertResult<E>>
  • insertAll(Flow<E>): List<InsertResult<E>>

modify

This tries to modify a record in the database. If the record does not exist, an error is thrown.

Overloads
  • modify(EntityModifyDetails<E>): ModifyResult<E>
  • modify(E): ModifyResult<E>
  • modify(E, UniqueEntityIndexReference<E>): ModifyResult<E>
  • modifyAll(vararg E): List<ModifyResult<E>>
  • modifyAll(vararg EntityModifyDetails<E>): List<ModifyResult<E>>
  • modifyAll(List<EntityModifyDetails<E>>): List<ModifyResult<E>>
  • modifyAll(Flow<EntityModifyDetails<E>>): List<ModifyResult<E>>

upsert

This tries to modify a record in the database. If the record does not exist, the record is inserted instead.

Overloads
  • upsert(EntityModifyDetails<E>): UpsertResult<E>
  • upsert(E): UpsertResult<E>
  • upsertAll(vararg E): List<UpsertResult<E>>
  • upsertAll(vararg EntityModifyDetails<E>): List<UpsertResult<E>>
  • upsertAll(List<EntityModifyDetails<E>>): List<UpsertResult<E>>
  • upsertAll(Flow<EntityModifyDetails<E>>): List<UpsertResult<E>>

delete

This tries to delete a record.

Overloads
  • delete(E): DeleteResult<E>
  • delete(UniqueEntityIndex<E>): DeleteResult<E>
  • deleteAll(vararg E): List<DeleteResult<E>>
  • deleteAll(vararg UniqueEntityIndex<E>): List<DeleteResult<E>>
  • deleteAll(List<E>): List<DeleteResult<E>>
  • deleteAll(Flow<E>): List<DeleteResult<E>>

update

The update operation is a condensed syntax for modifying data in the database. It works by providing a scope and a transformation. The scope could be one of:

  • updateBy - a unique index
  • updateRangeBy - an index range
  • updateAll - a whole table

The transformation is a lambda, where the rows that are in scope are provided one by one. The rows are provided as in the database, and can be modified in place, with the changes applied to the database. All update calls use the safeWriteTransaction and are transactional if the database supports it.

  • In the async entity db, the lambda will be of type E.() -> Unit. The entity will be the receiver and in the lambda, this will refer to the row, which should be modified in place.

  • In the rx entity db, the lambda will be of type Consumer<E>. The lambda will have a single parameter, the entity. Similar to the async version, the row should be modified in place.

In both cases, the full record will be provided, and values can be read as well as updated. The operations return List<ModifyResult<E>> for updateAll and updateRangeBy methods and ModifyResult<E> for the updateBy operation.

For example:

db.updateBy(Trade.byId("xxxxx")) {
price = 15.0
}

db.updateByRange(Trade.byOrderId("xxxx")) {
orderStatus = OrderStatus.CANCELLED
}

db.updateByRange(Trade.byOrderId("xxxx"), Trade.byOrderId("yyyy") {
orderStatus = OrderStatus.CANCELLED
}

db.updateAll<Trade> {
orderStatus = OrderStatus.CANCELLED
}

recover

The recover operation enables you to insert a document into the database using the record's preset timestamp and ID.

The following recover operations are supported:

  • recover
  • recoverAll
danger

This API must be used with caution. Integrity checks are skipped, so it can leave your Genesis application in a poor state if used incorrectly. Record IDs and timestamps are assumed to be unique.

Transactions

If the underlying database supports transactions, then the entity db provides type-safe access to these. A read transaction will support the same read operations as the entity db, and a write transaction will support the same read and write operations. If a write transaction fails, all operations will be reverted. Subscribe operations are not supported within transactions.

Transactions are supported on database layers: FoundationDb, MS SQL, Oracle and Postgresql.

When code is expected to run on multiple database types, transactions should be used when available. You can use safeReadTransaction and safeWriteTransaction. These run operations in the block in a single transaction, if supported.

There is a distinction between using Kotlin and Java here.

  • When using Kotlin, the transaction is the receiver in the readTransaction call. This means that within the block, this refers to the transaction.
  • When using Java, the transaction is the first parameter of the lambda.

Read transactions

Read transactions ensure that all read operations are consistent. Intervening writes will not affect reads within the transaction. The return value in the transaction will also be returned from the transaction. For the RxEntityDb, it will be a Single<*>.

val orderTrade = db.readTransaction {
val trade = get(Trade.byId("TR_123"))
val order = get(Order.byId(trade.orderId))
buildOrderTrade(order, trade)
}

Write transactions

Write transactions ensure all read and write operations are consistent. If any exception reaches the transaction level, all writes are rolled back. The writeTransaction will return a Pair<T, List<EntityWriteResult<*>>>, where T is the value returned in the writeTransaction lambda.

val (orderId, writeResults) = db.writeTransaction {
insert(trade)
val orderInsert = insert(order)
orderInsert.record.orderId
}

Subscribe operations

Subscribe operations create a subscription to the given table or view. They will be invoked whenever updates occur to the entity they are listening to.

Concepts and helper classes

When database updates are distributed, they are wrapped in helper classes:

Generic record update

Generic records provide a generic way of publishing updates by the database. All updates have the same super type, GenericRecordUpdate, which is a sealed Kotlin class. This means that all instances are guaranteed to be one of the implementation types:

  • GenericRecordUpdate.Insert
  • GenericRecordUpdate.Delete
  • GenericRecordUpdate.Modify

All these types have the following properties:

  • tableNameString
  • recordIdLong
  • timestampLong
  • emitterString?

Additionally, GenericRecordUpdate.Insert and GenericRecordUpdate.Delete also have a record field. Whereas GenericRecordUpdate.Modify has an oldRecord and a newRecord field.

Bulk

Bulk objects are published to listeners of mixed read/subscribe operations. Like Record UpdateBulk is a sealed Kotlin class. It has the following class hierarchy:

  1. Bulk
    1. Bulk.Prime
      1. Bulk.Prime.Record
      2. Bulk.Prime.Complete
    2. Bulk.Update
      1. Bulk.Update.Insert
      2. Bulk.Update.Delete
      3. Bulk.Update.Modify

A bulk flow always follows this sequence:

  1. 0 or more Bulk.Prime.Record
  2. Bulk.Prime.Complete
  3. 0 or more Bulk.Update subtypes

subscribe

The subscribe operation starts a database listener that receives updates to tables or views.

Subscribe parameters

Subscribe supports the following parameters:

NameTypeRequiredMeaningDefault value
Table nameString✔️The table to subscribe ton/a
fieldsSet<String>Only listen to changes on selected fieldslisten to all fields
delayIntGroup and publish updates every x msno grouping
subscribeLocallyBooleanWhen in a cluster, only listen to local updatesfalse
listenerNameStringWhen in a cluster, only listen to local updatesgenerated name
Overloads

The rx entity db takes a Class<E>, whereas the async entity db takes a KClass<E>. Parameters marked with an asterisk(*) are optional.

  1. subscribe([KClass<E> / Class<E>], delay*, fields*, subscribeLocally*): Flow<E>

These functions are available in kotlin only:

  1. subscribe<E>(delay*, fields*, subscribeLocally*): Flow<E>
val subscription = launch {
db.subscribe<Trade>()
.collect { update ->
println("Received a trade update! $update")
}
}

bulkSubscribe

The bulkSubscribe operation combines a getBulk and a subscribe call into a single operation. This is useful when a class needs to read a full table and then receive updates of changes to the underlying table or view.

This operation supports backwards joins for views. This means that it can receive updates on both the root table and the joined tables. The view needs to support this in the definition, and it must be enabled on the bulkSubscribe call.

Example
val bulk = db.bulkSubscribe<Trade>()
Parameters

bulkSubscribe supports the following parameters:

NameTypeRequiredMeaningDefault value
TableClass<E>✔️The table to subscribe to
IndexUniqueEntityIndexReference<E>The index to sort the getBulk operation by
fieldsSet<String>Only listen to changes on selected fields (filters ModifyResult on fields)listen to all fields
delayIntBatch updates x msno batching
subscribeLocallyBooleanWhen in a cluster, only listen to local updatesfalse
backwardJoinsBooleanSubscribe to changes on sub-tables (backwards joins)false
listenerNameStringWhen in a cluster, only listen to local updatesgenerated name
Java support

There are many optional parameters for bulk subscribe operations. In Kotlin, this works well in a single method, due to named parameters and default values.

In Java, however, we provide a fluent API to set optional parameters.

The function bulkSubscribeBuilder returns a builder.

Optional parameters can be set with a "with-function", e.g. withDelay(...).

Once all values have been set, the toFlowable() function transforms the builder into a Flowable.

More complex example
val bulk = db.bulkSubscribe<Trade>(
fields = setOf("TRADE_DATE"),
delay = 500,
subscribeLocally = false,
index = Trade.ByTypeId,
backwardJoins = true,
)

rangeSubscribe

rangeSubscribe is similar to bulkSubscribe. The rangeSubscribe operation combines a getRange and a subscribe call into a single operation. This is useful when a class needs to read a range of values from table and then receive updates of changes to the relevant records in the underlying table or view.

This operation supports backwards joins for views. This means that it can receive updates on both the root table and the joined tables. The view needs to support this in the definition, and it must be enabled on the bulkSubscribe call.

Different range options

Like getRange, rangeSubscribe supports single-value and interval ranges. In addition, range subscribe supports both static and dynamic ranges. See below for details.

Note on Java

Like bulkSubscribe, there are many optional parameters for range subscribe operations. In Kotlin, this works well in a single method, due to named parameters and default values.

In Java however, we provide a fluent API to set optional parameters.

The functions staticRangeSubscribeBuilder and dynamicRangeSubscribeBuilder return a builder.

Optional parameters can be set with a "with-function", e.g. withDelay(...).

Once all values have been set, the toFlowable() function transforms the builder into a Flowable.

Static ranges

By default, ranges are static; they are set at the start of the subscription and not updated afterwards.

// single-value range
val range = db.rangeSubscribe(Trade.byCurrencyId("USD))

// interval range
val range = db.rangeSubscribe(
Trade.byDate(startDate),
Trade.byDate(endDate)
)

Dynamic ranges

Additionally, rangeSubscribe also supports dynamic ranges. These are ranges that are refreshed either at a specified time, or interval.

This can be useful for long-running subscriptions on dates. For example, you might want to keep an eye on trades booked in the last hour. Unless that range is refreshed, it will fall behind.

In the example below, we have two dynamic ranges.

  • The first one filters on a specific date, and is updated at midnight.
  • The second one has a 2-hour window, and is updated every 5 minutes.
// single-value range
db.rangeSubscribe(
fromProvider = { Trade.byDate(DateTime.now().withTimeAtStartOfDay()) },
updateFrequency = PalDuration.At(LocalTime.MIDNIGHT)
)
// interval range
db.rangeSubscribe(
fromProvider = { Trade.byDate(DateTime.now().minusHours(1)) },
toProvider = { Trade.byDate(DateTime.now().plusHours(1)) },
updateFrequency = PalDuration.Every(Duration.ofMinutes(5))
)
Parameters

For static ranges rangeSubscribe supports the parameters below.

In the Java API, the function returns a builder, where optional parameters can be set.

NameTypeRequiredMeaningDefault value
Start indexEntityIndex<E>✔️The index entry to read from
End indexEntityIndex<E>For interval rangesThe index entry to read to
numKeyFieldsInt❌️The number of key fields to take into account for the range
updateFrequencyPalDurationFor dynamic rangesA schedule to update dynamic range boundaries?
delayIntGroup and publish updates every x ms200ms
fieldsSet<String>Only listen to changes on selected fields (filters ModifyResult on fields)listen to all fields
subscribeLocallyBooleanWhen in a cluster, only listen to local updatesfalse
backwardJoinsBooleanSubscribe to changes on sub-tables (backwards joins)false
listenerNameStringWhen in a cluster, only listen to local updatesgenerated name
Example
val range = asyncEntityDb.rangeSubscribe(
from = Trade.byDate(startDate),
to = Trade.byDate(endDate),
numKeyFields = 1,
delay = 500,
fields = emptySet(),
subscribeLocally = true,
backwardJoins = true
)

Subscribing to views

There are some subtleties when it comes to subscriptions on views.

By default, when subscribing to updates on a view, only updates on the root table are sent. If you want updates on the joined table or tables, you must [enable a backwardsJoin on each join.

Inner joins can produce different results. For example, a modification to a table could become a delete or insert on the view. There are details of inner joins in our page on views.

Overall, the better you understand Data Servers, the better your chances of implementing Entity Db without errors. It is well worth looking through the entire section in our documentation.

Update batching

For subscribe operations, by default, Genesis does not publish every update received. Updates are batched and compressed before publishing. This is designed to reduce the load on high-volume systems.

Batching is controlled by the delay parameter in the subscribe method. This is used to specify the batching interval for updates.

For example, if a trade is inserted, and then modified rapidly in quick succession within the same batching period, there is no requirement for publishing any intermediate states. It is easier and more efficient for the system to publish a single update, so the insert and modify are compressed into a single insert update.

Batching phases

Batching happens in two phases:

  1. The updates are gathered together.
  2. At the end of the batching period, they are compressed and the resulting updates are published.

So what does this mean in practice?

The batching period covers the whole subscription. It is always set globally, i.e. for an update queue subscription or on a data server query. It is never set at the individual row level.

The goal of this batching is to minimize work, and to publish updates that are accurate, rather than to publish every change.

In this regard it is better to think in terms of a batching interval, rather than a batching period, which looks something like this:

...
*
|---- gathering phase ----| (batching period)
*
*
*
*
*
|---- flushing phase -----| (commence compress and publish)
* compress updates
* publish updates
|---- gathering phase ----| (next batching period)
*
*
...
Batching is non-deterministic

Batching occurs within discrete batching and only within each period.

  • If a trade is inserted and then modified within a single batching period, the two will be compressed into a single update.
  • If the trade is inserted one millisecond before the end of a batching period, it is modified right at the start of the next period; the insert is sent at the end of the first period and the update is sent at the end of the next period - there are two updates.

The point here is that there is no way of determining whether a series of updates and modifies will be compressed or not. It is entirely determined by the timing of the individual inserts and modifies. All we can determine is that the smaller the gap between the insert and modify, the more likely they are to be compressed.

Batching at different levels

Note that batching happens at both the Data Server and the update-queue level.

  • Genesis batches on the queue level to minimize the amount of work the Data Server has to do. Because we pre-allocate LMDB when the data sever starts, having too many needless updates would eat into that allocation.
  • Batching is performed on the Data Server to minimize the load on the client and the connection.

Out-of-sequence updates on the queue

Updates are published on the update queue once a record has been written to the database, or when a transaction completes.

Within a high-throughput set-up, where updates on the same table are published from multiple sources, updates can be received out of sequence.

This means that when listening to updates, in a high-throughput application, it is essential to check record timestamps to detect and handle out-of-sequence updates. For views, this is less of a problem, as Genesis refreshes the record from the database within a transaction to make sure that the published update is valid.

These multiple sources could be:

  • Different event handlers on the same node modifying the same table
  • The same event handler running on multiple nodes

Return Types

SyncEntityDb

The blocking - or sync - API is the simplest API for accessing the Genesis database.

Get operations in this API are nullable. When getting a record, you must check if it is null:

val trade: Trade? = db.get(Trade.byId("TRADE_1"))
if (trade == null) throw RuntimeException("Unable to find trade")

Write operations return either a single return object, or a list of result objects, for the ...All operations, e.g. updateAll. These are never nullable.

val insert: InsertResult<Trade> = db.insert(trade)
val inserts: List<InsertResult<Trade>> = db.insertAll(myTrades)

getRange and getBulk operations a List<X>:

val trades: List<Trade> = db.getBulk(TRADE)

Note that this will load the entire table into memory. This is fine for smaller tables and ranges.

For large tables and ranges, the stream() function is provided on the database. This creates a Stream that streams the data, rather than loading it all in memory:

val trades: Stream<Trade> = db.stream().getBulk(TRADE)
try {
// use the stream here
} finally {
trades.close()
}

While this is more memory-efficient, the database connection will be kept open until the stream is closed. As in the above example, always close the stream. Not closing these streams will lead to your application running out of database connections.

AsyncEntityDb

The Genesis Application Platform relies on Kotlin coroutines for providing high-performance applications. The Async API is the preferred API for accessing the database in Kotlin. The RxJava API is also available. If you use Java, the asynchronous API is not available.

Coroutines overview

Coroutines provide support for non-blocking asynchronous operations at the language level. This makes the code highly efficient; it also means that asynchronous code can be written in a style that appears synchronous.

Traditionally, on the JVM, code waiting on an asynchronous operation will block a thread while waiting for completion or a result. For example, when waiting on the result of a Future, code will often call get, which will block the thread until the operation is completed.

Coroutines avoid this situation; they suspend, rather than block. When code reaches a point at which it can no longer complete and needs to wait for a result, it will suspend. It hands the thread back to the coroutine context, where it can be used by another call. Once the asynchronous operation completes, execution is resumed.

If you are not familiar with Kotlin coroutines, it is well worth finding out about them. They are very efficient. Both of the following are useful sources of information:

Using suspend

In Kotlin, a function modified with the suspend keyword becomes a suspending function. A suspend function can only be called from a coroutine. There are three types:

  • Nullable suspend. This can only return a single value.
  • Suspend. This can only return a single value.
  • Flow. This can emit multiple values sequentially. A flow is conceptually a stream of data that can be computed asynchronously. The emitted values must be of the same type. For example, a Flow<Int> is a flow that emits integer values.

RxEntityDb

RxJava is a Java implementation of reactive extensions. The Genesis database uses this library to represent asynchronous database operations in java.

If you are using Java. RxJava API is the only way of accessing the database. For Kotlin, the async API is preferred (although the RxJava API is also supported).

Subscription

It is important to note that any database operation with RxJava return type is cold until it is subscribed to. This means that the operation is not sent to the database until that time.

RxJava return types

The Genesis database uses three RxJava return types:

Return typeminimum returnsmaximum returns
Single11
Maybe01
Flowable0
Single

In the RxJava API, a Single represents an asynchronous operation that has two possible outcomes:

  • a success with result, or
  • a failure

For example, on the database, delete returns a Single, with the following possible outcomes:

  • the record was deleted; it provides a write result
  • the operation was not successful; for example, the record was not found
Maybe

In the RxJava API, a Maybe represents an asynchronous operation that has three possible outcomes:

  • a success with result
  • a success with no result
  • a failure

For example, on the database, get returns a Maybe, with the following possible outcomes:

  • a record is found
  • no record is found
  • the operation was not successful, for example the index was incorrect
Flowable

In the RxJava API, a Flowable represents an asynchronous operation that has an undefined number of outputs.

Frequently asked questions

Write operations not updating in database

When using the RxJava api, it is paramount that you end the call chain with a subscribe... call. The underlying database operation will not commence until it is subscribed to.

Type Unsafe DB API

The following APIs for DbEntity and DbRecord are used rarely, when you need to operate outside the type-safety entities.

DbEntity

The DbEntity methods are described below:

NameSignatureDescription
toDbRecordfun toDbRecord(entity: E): DbRecordConverts an entity to DbRecord
toGenesisSetFormattedfun toGenesisSetFormatted(entity: E, configs: Collection<ColumnConfig>? = null): GenesisSetConverts a view to GenesisSet and applies any formatter/aliases assigned to the fields
toGenesisSetfun toGenesisSet(entity: E, columns: Collection<String>): GenesisSetConverts a view to GenesisSet. This is the plain representation of view fields
getoperator fun get(field: String): Any?Gets the provided field value

DbRecord

warning

Using DbRecord instead of entities will circumvent compile-time validation of database interactions. This means that errors might not appear until runtime or might lead to unexpected results.

DbRecord enables you to build a record of a specified table. It is not type-safe, so this is not our recommended method.

Constructors
SignatureDescription
constructor(tableName: String)Create record with specified table name.
constructor(source: DbRecord?)Clone existing record.
constructor(targetTableName: String, source: DbRecord?)Clone an existing record into another record belonging to a different table. This is useful when the target table record is the extended table of the source record.

Example

val tradeRecord = DbRecord("TRADE")

val clonedTradeRecord = DbRecord(tradeRecord)

DbRecord("TRADE_SUMMARY", tradeRecord)
functions

Use the functions below to set and get fields of DbRecord. The field type can be any of these types.

Set record

fun set{DataType}(column: String, value: {DataType}?) You need to specify name and value of the column. DataType represents the type of the field you are trying to set. If you are trying to set a Double field, the method would look like this: fun setDouble(column: String, value: Double?); the value needs to be non-null.

Get record

fun get{DataType}(column: String): {DataType}? You need to specify the name of the column. DataType represents the type of the field you are trying to get. If you are trying to get a Double field, the method would look like this: fun getDouble(column: String): Double?, it returns value if present, otherwise null.

Generic getter and setter

fun getObject(column: String): Any? Generic access to fields. Returns value if present, otherwise null.

fun setObject(column: String, value: Any?) Generic setter for fields.

Other useful functions and properties

fun differenceInFields(comparatorRecord: DbRecord): Collection<String> This function compares the difference in fields between two records in the same table. It identifies the fields that have different values. Fields with the same values are ignored. If you try to compare fields from different tables, this returns an error.

Example

// gives the fields which differ in their values
dbRecord.differenceInFields(dbRecord2)

isEmpty This property identifies whether there is any content within the DbRecord. It returns true if there is no information collected within this record, otherwise, it returns false.

columns This property gets all columns for this record.

tableName This property gets the table name of record.

Timestamps

When you generate a database on the Genesis platform, every table in the database is given a TIMESTAMP and a RECORD_ID field.

  • The TIMESTAMP field value is generated automatically by GenesisFlake every time a change is made to the database.
  • The RECORD_ID field is the TIMESTAMP value when the record is first created, it will never change.

The database generates a new TIMESTAMP for every modify operation, even if no other fields are changed.

To create these values, GenesisFlake generates IDs in a similar manner to Twitter’s snowflake. It is able to generate these IDs without having to perform database-level synchronization - which ensures high performance.

An ID includes:

  • a node number (which represents the node id within a Genesis cluster)
  • a sequence number (used to differentiate IDs generated within the same millisecond)

Timestamps are essential if you use Optimistic Concurrency.

Format

The GenesisFlake timestamp is made up of:

  • epoch time in milliseconds
  • node id
  • sequence id

The timestamp itself is stored in the most significant bits of a LONG variable, leaving the least significant bits to node id and sequence number.

A raw ID value looks like this, for example: 6626101958220449352.

You can extract the timestamp component using bitwise right-shift operations. For example:

The result in decimal corresponds to 1579785813861, which can be checked in https://www.epochconverter.com/

Find most recent changes

To find the most recent change to table in your database:

  1. Add an index on the TIMESTAMP field for the table.
  2. Perform a getRangeFromEnd for that index. This returns all the records, beginning with the most recent.
  3. Keep only the first record.

Optimistic concurrency

Optimistic Concurrency helps prevent users from updating or deleting a stale version of a record. To do this, we need to know what the intended version of the record to update or delete is. Internally, we use the record's timestamp field as its version.

Configuration

If you want to use Optimistic Concurrency, you must configure it by setting the DbOptimisticConcurrencyMode property to the Event Handler definition in your applicationName-processes.xml file.

The DbOptimisticConcurrencyMode property can have one of the following values: STRICT, LAX and NONE. These modes are detailed below. Defaults to NONE.

There are two simple ways of setting the property.

In this first example, we simply specify STRICT Optimistic Concurrency as one of the options:

alpha-processes.xml

<process name="ALPHA_EVENT_HANDLER">
<groupId>ALPHA</groupId>
<start>true</start>
<options>-Xmx256m -DRedirectStreamsToLog=true -DXSD_VALIDATE=false -DDbOptimisticConcurrencyMode=STRICT</options>
<module>genesis-pal-eventhandler</module>
<package>global.genesis.eventhandler.pal</package>
<script>alpha-eventhandler.kts</script>
<description>Handles events</description>
<classpath>alpha-messages*,alpha-eventhandler*</classpath>
<language>pal</language>
</process>

Alternatively, you can specify the option in a separate file and use the configOverridesFile property. In the example below, the file is called alpha-sysdef.properties and we have shown its content in a separate codeblock.

alpha-processes.xml

<process name="ALPHA_EVENT_HANDLER">
<groupId>ALPHA</groupId>
<start>true</start>
<options>-Xmx256m -DRedirectStreamsToLog=true -DXSD_VALIDATE=false</options>
<module>genesis-pal-eventhandler</module>
<package>global.genesis.eventhandler.pal</package>
<script>alpha-eventhandler.kts</script>
<description>Handles events</description>
<classpath>alpha-messages*,alpha-eventhandler*</classpath>
<configOverridesFile>alpha-sysdef.properties</configOverridesFile>
<language>pal</language>
</process>
alpha-sysdef.properties
DbOptimisticConcurrencyMode=STRICT
warning

Do not apply as a global system definition property, as some internal services cannot operate properly when this is enabled.

Modes

ModeDescription
STRICTA check is performed on modify and delete database operations; the TIMESTAMP of the database record must be the same as the TIMESTAMP provided on the event.
LAXA check is performed on modify and delete database operations; if the event has provided a TIMESTAMP, the TIMESTAMP of the database record must be the same as the TIMESTAMP provided on the event. If no timestamp is provided, then no check is performed.
NONE (Default)Checks are disabled.

Event Handler

The checks are dependent on the availability of a timestamp field.

  • You must make sure that any custom classes used in your Event Handlers have a timestamp field. Add this manually, if necessary. If there is no timestamp field, the database write will fail with an error stating that a TIMESTAMP field is missing.
  • Generated entities always have a timestamp field, so the checks will work for these without any extra coding.
Examples

This eventHandler is called TRADE_MODIFY, and it is based on a generated entity.

trade-eventhandler.kts
eventHandler<Trade>(name = "TRADE_MODIFY") {
onCommit { event ->
val trade = event.details
entityDb.modify(trade)
ack()
}
}

Below is an eventHandler called TRADE_CANCEL. It is based on a custom class called TradeCancel, which itself is described in the second codeblock.

trade-eventhandler.kts
eventHandler<TradeCancel> {
onCommit { event ->
val message = event.details
val trade = Trade {
tradeStatus = TradeStatus.CANCELLED
}
trade.timestamp = message.timestamp
entityDb.modify(trade)
ack()
}
}

TradeCancel Kotlin data class

TradeCancel.kt
data class TradeCancel(
val tradeId: String,
val timestamp: Long,
)

Linking the front end

When sending an event to the server, the front end needs to know what timestamp to send as part of the payload.

The front end of Optimistic Concurrency is driven from Entity Management. You need to set up the relevant Data Server query or Request Server requestReply for entity management.

Data Server
  • If the 'query` has no specific fields defined, the checks work automatically.
  • If the 'query` has defined specific fields, you must add the TIMESTAMP field.

In this example, no specific fields have been defined:

trade-dataserver.kts
query("ALL_TRADES", TRADE)

In this next example, specific fields have been defined, so we have added TIMESTAMP:

trade-dataserver.kts
query("ALL_TRADES", TRADE) {
fields {
TRADE_ID
QUANTITY
PRICE
TIMESTAMP
}
}
Request Server
  • If the 'requestReply` has no specific fields defined, the checks work automatically.
  • If the 'requestReply` has defined specific fields, you must add the TIMESTAMP field.

In this example, no specific fields have been defined:

requestReply(TRADE)

In this next example, specific fields have been defined, so we have added TIMESTAMP:

requestReply("TRADE", TRADE) {

request {
TRADE_ID
}

reply {
TRADE_ID
QUANTITY
PRICE
TIMESTAMP
}
}

Generated repositories

warning

IMPORTANT!

Generate Repository usage is legacy. Entity DB APIs replace their usage. From GSF version 7.0.0 onwards, code generation for database repositories is disabled by default. To re-enable code generation, change the generateRepositories setting inside the build.gradle.kts files for the genesis-generated-dao and genesis-generated-view modules, as shown below:

codeGen {
generateRepositories = true
}

During the code generation phase, repository classes can be generated for every table and view in the system. These repositories provide a type-safe way of accessing the database.

The main differences between the generated repositories and the Entity Db are:

  • The entity db can handle any database entity, each repository can only handle a single entity.
  • The generated repositories have specific methods for every index of table/view, whereas the entity db is fully generic.
Supports tables✔️
Supports views✔️
Supports any data type
Class to import[TableName]AsyncRepository \ [TableName]Rx3Repository
Type-safe read and write✔️
Type-safe write result
Returns data astable or view entities
Writes data astable entities
References indexes asGenerated methods
Programming interfaceAsync or RxJava
Write (input)Generated
Write (output)Write Result)
SubscribeRecord Update of entity type
Bulk or Range Subscribe Bulk of entity type
Available in Event Handlers  
Available in custom Request Servers 

With generated repositories, there are two flavours of the entity db:

  • one has a RxJava API signatures, for use from Java

  • the other flavour has an Async API signature, for use by Kotlin

If you have a table called POSITION, then, when you run a generateDao, two repositories will be generated: PositionRx3Repository and PositionAsyncRepository.

You can perform CRUD operations on Table/View by its primary key and indices.

Write result

This is a single catch-all type for results of a write operation when using generated repositories or RxDb.

This type has the following fields:

  • savedRecords - the saved records
  • removedRecords - the deleted records
  • modifiedFields - the updated fields
  • numRecordsDeleted - the number of records deleted
  • isError - there was an error during the database operation. Needs to be checked when ValidationMode.LEGACY is used
  • errorText - the error description. Needs to be checked when ValidationMode.LEGACY is used

Modify details

ModifyDetails does the same job as EntityModifyDetails and takes similar arguments for use with RxDb.