Skip to main content

Real-time queries (Data Server)

Overview

A Data Server supplies real-time (streaming) data to the front end of your application.

Data Servers preload and monitor specific tables or views in the database. When a change in data occurs, the Data Server sends the changed data to all users who have subscribed to the table or view.

The first time the user subscribes, all the data for the table or view is sent. The connection between the user and the Data Server remains open; so from then on, the Data Server automatically sends changes.

The Data Server utilizes the push mechanism of web sockets to update subscribed clients with real time updates the data they are subscribed to and filtering on, and also offers a REST API.

When the client first requests to subscribe to a query in a Data Server, the initial data is returned, and the subscription between the client and the query remains active. The data server will continue to push data updates to the client, for example newer entries or modifications to data rows already sent to the client. The client can also make changes for example to request more rows.

Genesis Data Servers are defined in the *-dataserver.kts files.

All Data Server queries are available via REST automatically too.

Example configuration

Keeping with the data model examples the following show example dataserver configurations.

dataServer {

query(INSTRUMENT)

query("ALL_COUNTERPARTYS", COUNTERPARTY) {
permissioning {
permissionCodes = listOf("CounterpartyView")
}
}

query("ALL_TRADES", TRADE_VIEW) {
permissioning {
permissionCodes = listOf("TradeView")
auth(mapName = "COUNTERPARTY"){
rowLevelPermissions = true
authKey {
key(data.counterpartyId)
}
}
}
fields {
TRADE_ID
TRADE_PRICE
DIRECTION
QUANTITY
DATE
COUNTERPARTY_ID
COUNTERPARTY_CODE
COUNTERPARTY_NAME
INSTRUMENT_NAME
NOTIONAL
}

filter {
data.date > DateTime.now().minusDays(30)
}
}
}
INSTRUMENT

The query name is defaulted as no name is specified.

COUNTERPARTY

A query name is specified.
Access control permissions are set, only users with CounterpartyView permission are able to access this query and see data. With the INSTRUMENT query above, any user who can authenticate with this app are able to access this query and see data.

TRADE_VIEW

This query is for a view, where as the previous two were onto tables. This query brings in all the fields defined in the view, which span multiple tables.

We have specified the fields of the view we want to show explicitly, if there are fields in a view/table we don't need we can leave them out of this configuration. Where fields block is not specified all are implicitly included.

The auth block in permissioning utilizes access control for row level permissioning. Only users with access to the COUNTERPARTY_ID will see the row.

filter clause is a server side filter. Subscribing clients may specify criteria to filter data, however this filter applies on the back end and clients may not circumvent.

Summary

A Data Server file consists of a number of queries onto a table or view. All the details of the table or view are inherited from the definition, so you don’t need to supply any further details should you not need filter, apply access control, else limit or extend the set of fields on the view or query.

Configuration options

query

query defines a new data server query. It takes a name (optional) and an entity (table or view)

name is optional. Left unspecified, it is defaulted ALL_\{table/view name\}S.

So, in the example below:

  • The first query is called INSTRUMENT_DETAILS, as this name has been specified.
  • The second query is called ALL_COUNTERPARTYS, because it has not had a name specified. The allocated name is based on the COUNTERPARTY table, which it queries.
dataserver {
// Name of the data server: ALL_INSTRUMENTS
query(INSTRUMENT)

// Name of the data server: ALL_COUNTERPARTYS
query("ALL_COUNTERPARTYS", COUNTERPARTY) {
}

config

The following settings can be applied at a query-level. See global config for settings which can also be defined to override at the query level.

Example:

dataServer {
query("ALL_TRADES", TRADE_VIEW) {
config {
defaultCriteria = "TRADE_PRICE > 0"
backwardsJoins = false
disableAuthUpdates = false
}
}
}
Config itemDescriptionDefault
defaultCriteriaThis represents the default criteria for the query
disableAuthUpdatesDisables real-time auth updates in order to improve the overall data responsiveness and performance of the serverfalse
backwardsJoinsEnables backwards joins on a view query. Read backwards joins for more detailstrue
backJoins(Deprecated) Replaced by backwardsJoins. Functionally the same.

fields

This block is optional. Where not specified all table or view fields in a query definition will be exposed. If you don't want them all to be available, you must define the fields that are required. In the example below, we specify eight fields:

dataServer {
query("ALL_TRADES", TRADE_VIEW) {
fields {
TRADE_ID
TRADE_PRICE
DIRECTION
QUANTITY
DATE
COUNTERPARTY_ID
COUNTERPARTY_CODE
COUNTERPARTY_NAME
INSTRUMENT_NAME
NOTIONAL
}
}
}

You can override the name of a field using various operators, this is necessary in the case a field name is the same as another table's field name.

  • withAlias <NAME> gives the field an alternative NAME
  • withPrefix adds a prefix to the field name

withFormat <FORMAT_MASK> can also be used to override DATE, DATETIME and numerical fields to be sent as a String in a particular format by using format masks.

withSize <NUMBER OF CHARACTERS> can also be specified at a field level on the rare occasion you are seeing issues with LMDB truncating a String value you can specify the maximum number of characters by specifying this.

derivedField

You can also define derived fields to supplement the fields supplied by the table or view. All fields are available under the data parameter.

In the example below, we add a trade description made up of many fields from the row:

    derivedField("TRADE_DESCRIPTION", STRING) {
data.direction + " " + data.quantity + " " + data.instrumentName + " " + data.tradePrice + " " + data.ctptyName
}
note

Derived fields cannot be used within a filter block.

derivedFieldWithUserName

This is the same as derivedField but also has a context property userName with the username who requested the data.

In the example below, the TRADE_DESCRIPTION will be prefixed with "My Trade " if it was traded by the user querying the data.

    derivedFieldWithUserName("TRADE_DESCRIPTION", STRING) {
val myTrade = if(userName == data.tradedBy) { "My Trade " } else { "" }
myTrade + data.direction + " " + data.quantity + " " + data.instrumentName + " " + data.tradePrice + " " + data.ctptyName
}
note

Derived fields cannot be used within a filter block.

filter

Where specified data is filtered by the query and this supersedes any client criteria specified.

filter requires boolean logic and has access to all fields defined in the query. It has a data property which has access to all fields on the entity

    filter {
data.date > DateTime.now().minusDays(30)
}

Note in this example DateTime.now() will be evaluated for every row, which comes with a small overhead. To stop this, you can define it outside per the example below:

  ...
val today = DateTime.now()
...
...
filter {
data.date > today.minusDays(30)
}
...

filterWithUserName

This is the same as filter but also has a context property userName with the username who requested the data.

In the example below, the entity has a field ASSIGNED_TO which is populated with the user the data is assigned to, in this scenario rows which do not have ASSIGNED_TO set to the user querying the data will be filtered out.

    filter {
data.assignedTo == userName
}

indices

A query can optionally include an indices block to define additional indexing at the query level. When an index is specified in the query, all rows returned by the query are ordered by the fields specified, in ascending order.

The definition and the behavior of an index in a query are exactly the same as when you define an index in a table.

Example:

dataServer {
query("SIMPLE_QUERY", SIMPLE_TABLE) {
indices {
unique("SIMPLE_QUERY_BY_QUANTITY") {
QUANTITY
SIMPLE_ID
}
}
}
}

There are two scenarios where an index is useful:

  • Optimizing query criteria search. in the example above, if the front-end request specifies criteria such as QUANTITY > 1000 && QUANTITY < 5000, then the Data Server will automatically select the best matching index. In our example, it would be SIMPLE_QUERY_BY_QUANTITY. This means that the platform doesn't need to scan all the query rows stored in the Data Server memory-mapped file cache; instead, it performs a very efficient indexed search.
  • Where the front-end request specifies an index. You can specify one or more indices in your Data Server query. And the front end can specify which index to use by supplying ORDER_BY as part of the DATA_LOGON process. The Data Server will use that index to query the data. The data will be returned to the client in ascending order, based on the index field definition.

Important: Index definitions are currently limited to unique indices. As quantity does not have a unique constraint in the example definition shown above, we need to add SIMPLE_ID to the index definition to ensure we maintain uniqueness.

Note that table indexes are NOT available in dataserver queries.

permissioning

The permissioning block is used to implement access control measures on the data being returned to a user.

tip

With the exception of the inner auth block which relies on inbound data, the permissioning block and its contents can also be placed in the outer block and apply to all query/requestServer/eventHandler blocks named in the file.

permissionCodes

permissionCodes takes a list of permission codes. The client user accessing the query must have access to at least one of the permission codes in the list to be able to access any query data.

    permissioning {
permissionCodes = listOf("TradeView", "TradeUpdate")
}
customPermissions

The customPermissions block takes boolean logic. If this function returns true, the user will be able to access the resource; otherwise, the request will be rejected.
It is useful, for example, in the case you have an external API to check against. In this example an entitlementUtils object wraps an external API we can call to check the user's name has access.

  customPermissions { message ->
entitlementUtils.userIsEntitled(message.userName)
}

The customPermissions block also has access to entityDb so that you can query any data in the database. You can add complex logic in here as needed, and have access to the full inbound message.

  customPermissions { message ->
val userAttributes = entityDb.get(UserAttributes.byUserName(message.userName))
userAttributes?.accessType == AccessType.ALL
}

Where utilizing customPermissions you should also consider configuring a customLoginAck so that the front end of the application can also permission its components in a similar way.

auth

auth is used to restrict rows (queries) or events with a particular entity based on a user's entity access, also known as row-level permissions.

permissioning {
auth(mapName = "ENTITY_VISIBILITY") {
authKey {
key(data.counterpartyId)
}
}

Details on restricting query entity access here

Auth definitions can be grouped with “and” or “or” operators.

  • You could have two simple permission maps, for example: one by counterparty and another one for forbidden symbols. If the user wants to see a specific row, they need to have both permissions at once.
  • You could have two permission maps: one for buyer and one for seller. A user would be allowed to see a row if they have a seller or buyer profile, but users without one of the those profiles would be denied access.

This example shows an AND grouping:

permissioning {
auth(mapName = "ENTITY_VISIBILITY") {
authKey {
key(data.counterpartyId)
}
} and auth(mapName = "SYMBOL_RESTRICTED") {
authKey {
key(data.symbol)
}
}
}

This example shows OR grouping:

permissioning {
auth(mapName = "ENTITY_VISIBILITY") {
authKey {
key(data.buyerId)
}
} or auth(mapName = "ENTITY_VISIBILITY") {
authKey {
key(data.sellerId)
}
}
}
userHasRight

userHasRight is a helpful function which can be called within the auth block, or anywhere else in client facing server components which have a user accessing them, to determine if a user has a given right.

if (!userHasRight(userName, "TradeViewFull")) { ... }
hideFields

For queries, the auth block also allows a hideFields which can be used to restrict attribute/field/column values being returned to users.

For example, you can hide a column (or set of columns) based on boolean logic, for example if a user does not have a specific Right Code in their profile.

In the example below, the code uses auth and hideFields to check if the user has the Right Code TradeViewFull. If this code is not set for the user, then the column CUSTOMER_NAME will not be returned to the user:

  permissioning {
auth {
hideFields { userName, rowData ->
if (!userHasRight(userName, "TradeViewFull")) listOf(CUSTOMER_NAME)
else emptyList()
}
}
}

In most cases, a single auth entity id is associated to one or more rows in a Data Server query, but in some rare scenarios there might be a one-to-one relationship. In the latter case, there is a configuration option named rowLevelPermissions which can be set to true to optimize the internal data structure that holds the cache information.

query("ALL_COUNTERPARTYS", COUNTERPARTY) {
permissioning {
auth("COUNTERPARTY"){
rowLevelPermissions = true
authKey {
key(data.counterpartyId)
}
}
}
}

enrich

In some scenarios, you might want to associate the results of Data Server queries with the user who initiated the queries. This data cannot be cached, as it is derived from the subscribing client's input. You can achieve this using the enrich feature, which enables an additional table or view join (including backwards joins). With this feature, you can provide user-specific values for each row, or even perform cell-level permissioning (for example, to hide cell values), depending on entitlements.

The join operation receives two parameters:

  • userName is the current user name subscribed to the query
  • row is the pre-built query row

hideFields enables you to define a list of fields that will be hidden if certain conditions apply. Three parameters are provided:

  • userName the current user name subscribed to the query
  • row the pre-built query row
  • userData the table or view lookup result; this will be null if the lookup fails to find a record

The fields section defines what fields should be visible as part of the query. Use this if you want to use a subset of fields from the enriched table or view, or if you want to declare your own derived fields.

The example below should help you to understand the functionality. Comments are included in the code to ease understanding.

// Example using "hideFields" and creating derived fields based on user counterparty association
query("ALL_BUYER_SELLER_TRADES", BUYER_SELLER_TRADE_VIEW){
permissioning {
permissionCodes = listOf("ViewTrades")
auth("ENTITY_VISIBILITY"){
authKey {
key(data.buyerCounterpartyId)
}
} or
auth("ENTITY_VISIBILITY"){
authKey {
key(data.sellerCounterpartyId)
}
}
}
enrich(USER_COUNTERPARTY_MAP) {
join { userName, row ->
UserCounterpartyMap.ByUserName(userName)
}
// Hide buyer counterparty id to users associated to counterparty seller if "isHiddenToSeller" is true.
hideFields { userName, row, userData ->
if(userData?.counterpartyId == queryRow.sellerCounterPartyId && queryRow.isHiddenToSeller == true){
listOf(BUYER_SELLER_TRADE_VIEW.BUYER_COUNTERPARTY_ID)
} else{
emptyList()
}
}
fields {
// If a user belongs to the buyer counterparty, "COUNTERPARTY" value will be the seller name
// in the reverse scenario it will be the buyer name
derivedField("COUNTERPARTY", STRING) {
when {
enriched?.counterpartyId == data.buyerId -> data.sellerName
enriched?.counterpartyId == data.sellerId -> data.buyerName
else -> ""
}
}
// If a user belongs to the buyer counterparty, "DIRECTION" will be "BUY"
// in the reverse scenario it will be "SELL"
derivedField("DIRECTION", STRING) {
when {
enriched?.counterpartyId == data.buyerId -> "BUY"
enriched?.counterpartyId == data.sellerId -> "SELL"
else -> ""
}
}
}
}
}

// Example: selecting fields from enriched view
query("ALL_COUNTERPARTIES" , COUNTERPARTY_VIEW) {
// Lookup user counterparty favourite view and provide user enrich field to display if a counterparty has been marked as favourite by the user.
enrich(USER_COUNTERPARTY_FAVOURITE) {
join { userName, row ->
UserCounterpartyFavourite.ByUserNameCounterparty(username, row.counterpartyId)
}
// We only care about selecting the IS_FAVOURITE field from the USER_COUNTERPARTY_FAVOURITE view
fields {
USER_COUNTERPARTY_FAVOURITE.IS_FAVOURITE
}
}
}

// Example: filtering rows based on whether the enrichment was successful or not
query("ALL_ONLY_FAVOURITE_COUNTERPARTIES" , COUNTERPARTY_VIEW) {
// Lookup user counterparty favourite view
enrich(USER_COUNTERPARTY_FAVOURITE) {
join { userName, row ->
UserCounterpartyFavourite.ByUserNameCounterparty(userName, row.counterpartyId)
}
// The query should only display rows that have been enriched successfully
filter {
enriched != null
}
}
}

// Example: using "enrichedAuth" to combine fields from enrichment with authorization mechanism
query("ALL_FAVOURITE_COUNTERPARTIES", COUNTERPARTY_VIEW) {
permissioning {
enrichedAuth("COUNTERPARTY_FAVOURITE_VISIBILITY", USER_COUNTERPARTY_FAVOURITE) {
authKey {
key(data.counterpartyId, enriched?.isFavourite)
}
}
}
enrich(USER_COUNTERPARTY_FAVOURITE) {
join { userName, row ->
UserCounterpartyFavourite.ByUserNameCounterparty(username, row.counterpartyId)
}
}
}

Enriching data using the calling user's context is a great way to build rich experiences. In the example below, we have the positions table and the favourite trades table. We are able to derive a field on our query by bringing the user's favourite trades into the context.

Note the casing of FAVOURITE_TRADE and FavouriteTrade. The all-caps version denotes the table definition, whereas the camel-case version denotes the generated data access object (DAO).

query(POSITION) {
enrich(FAVOURITE_TRADE) {
join { userName, row -> FavouriteTrade.byUserName(userName) }
fields {
derivedField("IS_FAVOURITE", BOOLEAN) { row, favourite ->
data.code == enriched?.code
}
}
}
}

To be able to use the byUserName function over the FavouriteTrade DAO, you must add an index over the USER_NAME (a platform-level field).

table(name = "POSITION", id = 1600) {
sequence(POSITION_ID, "PS")
CODE
CREATED_AT
PRICE

primaryKey {
POSITION_ID
}
}
table(name = "FAVOURITE_TRADE", id = 1601) {
sequence(FAVOURITE_ID, "FA")
CODE
USER_NAME

primaryKey {
FAVOURITE_ID
}
indices {
unique {
USER_NAME
}
}
}

Index driven

You can specify "index-based" Data Server queries which will only read a defined range within a table or view, and only this data is monitored for updates (not the whole table or view). This makes the Data Server more responsive and reduces resource requirements. It uses the database range search operation getRange.

You can use the following options when you create index-based queries:

where

This provides a set of data equal to specified index. Advanced where accepts index and the provided index is used to get similar records from database. The Data Server query returns all the trade data whose quantity is equal to 42. You can optionally refresh keys using the refresh keyword, which sets a periodic refresh of keys, as shown in examples below:

query("TRADE_RANGE_BY_QUANTITY", TRADE) {
where(Trade.ByQuantity(42), 1)
}

query("TRADE_RANGE_USD_REFRESH", TRADE) {
where(Trade.ByCurrencyId("USD"), 1) {
refresh {
every(15.minutes)
}
}
}

The example below shows how advanced where queries differ from basic filter queries.

The scenario is this: you want to get trade records where the currencyId is USD. You can write a Data Server query in two ways, which affects how much data is cached:

  • Method 1 uses basic filter. It initially reads all the table/view data (which could be very large) and then applies the filter clause to confine the range to USD, so it can take a long time to get the Data Server query up and running.
  • Method 2 uses advanced where. It uses a database range search operation getRange, so it is able to read just the data we need from database using indices. This means the data that we need to process is much smaller - much more efficient. The where clause is applied at database level, the data returned by the database operation already contains the correct rows.
// Method 1:
query("TRADE_USD", TRADE) {
filter {
data.currencyId == "USD"
}
}

// Method 2:
query("TRADE_RANGED_USD", TRADE) {
where(Trade.ByCurrencyId("USD"), 1)
}
ranged

Use the ranged keyword to create a query that provides a range of data. You need to provide the index and the number of key fields:

  • The index property must be a unique index. However, you can have what is effectively a non-unique index by adding a field that is not unique (such as QUANTITY) and a field that is unique (such as TRADE_ID) to create a compound unique index:
unique {
QUANTITY
TRADE_ID
}
info

To create the default query index, a Data Server always uses the internal RECORD_ID value of the database records. This is equivalent to indexing records by their creation date. Data Servers do not use table indices. To query records efficiently, you need to recreate indices at the Data Server level.

  • Use the numKeyFields property to specify the number of fields to use from an index. The fields are always selected in the order they are specified in the index.
  • Use from to specify the start of the data range. This is mandatory.
  • Use to to specify the end of the data range. This is optional, but highly recommended. When to is not specified, the from clause works in a same way as advanced where, specified above. For these cases, we recommend using advanced where for better readability.
  • Optionally, you can use the refresh keyword to refresh the keys. This sets a periodic refresh of keys, as shown in examples below, (which include comments to ease understanding):
query("TRADE_RANGED_LAST_2_HOURS", TRADE) {
// the ranged key word makes this a ranged query
// the index and the number of key fields needs to be specified
ranged(index = Trade.ByTradeDateTimeAndType, numKeyFields = 1) {
// optionally refresh keys periodically, for example, when we are doing a
// range on dates
refresh {
// either every
every(2.hours)
// or at specific time
at(8.pm)
}
// required, starting key
from {
Trade.ByTradeDateTime(now().minusHours(2), "")
}
to {
Trade.ByTradeDateTime(now().plusHours(1), "")
}
}
}

Examples:

// all dollar trades:
query("TRADE_RANGED_TRADE_RANGE_USD", TRADE) {
where(Trade.ByCurrencyId("USD"), 1)
}

// all trades with quantity between 100 and 1,000
query("TRADE_RANGED_TRADE_RANGE_QTY", TRADE) {
ranged(Trade.ByQuantity, 1) {
from {
Trade.ByQuantity(100)
}
to {
Trade.ByQuantity(1000)
}
}
}

query("TRADE_RANGED_LAST_2_HOURS", TRADE) {
ranged(index = Trade.ByTradeDateTimeAndType, numKeyFields = 1) {
refresh {
every(15.minutes)
}
from {
Trade.ByTradeDateTime(now().minusHours(2), "")
}
to {
Trade.ByTradeDateTime(now().plusHours(1), "")
}
}
}

With refresh queries, rows that move out of the filter range will be removed from the cache, while rows that move into the filter will be added.

Examples when: numKeyFields > 1

The range configuration returns a set of sorted records based on the index definition and the number of fields involved (specified by numKeyFields) Let's see a couple of examples of ranged queries with their respective results in CSV format. The table definition with associated data sorted by its primary key fields is shown below:

table(name = "GENESIS_PROCESS_MONITOR", id = 20) {
MONITOR_NAME
PROCESS_HOSTNAME
PROCESS_NAME

primaryKey(name = "GENESIS_PROCESS_MONITOR_BY_HOSTNAME", id = 1) {
PROCESS_HOSTNAME
PROCESS_NAME
MONITOR_NAME
}
}
PROCESS_HOSTNAME,PROCESS_NAME,MONITOR_NAME
localhost,process_a,monitor_a
localhost,process_a,monitor_b
localhost,process_b,monitor_a
localhost,process_b,monitor_b
remote_host,process_a,monitor_a
remote_host,process_a,monitor_b
remote_host,process_b,monitor_a
remote_host,process_b,monitor_b

Example 1: Ranged query:

query("GENESIS_PROCESS_MONITOR_NUM_KEY_FIELDS_2", GENESIS_PROCESS_MONITOR) {
ranged(GenesisProcessMonitor.ByHostname, 2) {
from {
GenesisProcessMonitor.ByHostname(
processHostname = "localhost",
processName = "process_a",
monitorName = "monitor_a" // monitorName values are ignored since numKeyFields is 2
)
}
to {
GenesisProcessMonitor.ByHostname(
processHostname = "localhost",
processName = "process_b",
monitorName = "monitor_a" // monitorName values are ignored since numKeyFields is 2
)
}
}
}

Result:

PROCESS_HOSTNAME,PROCESS_NAME,MONITOR_NAME
localhost,process_a,monitor_a
localhost,process_a,monitor_b
localhost,process_b,monitor_a
localhost,process_b,monitor_b

Example 2: Ranged query:

    query("GENESIS_PROCESS_MONITOR_NUM_KEY_FIELDS_3", GENESIS_PROCESS_MONITOR) {
ranged(GenesisProcessMonitor.ByHostname, 3) {
from {
GenesisProcessMonitor.ByHostname(
processHostname = "localhost",
processName = "process_a",
monitorName = "monitor_a"
)
}
to {
GenesisProcessMonitor.ByHostname(
processHostname = "remote_host",
processName = "process_a",
monitorName = "monitor_a"
)
}
}
}

Result:

PROCESS_HOSTNAME,PROCESS_NAME,MONITOR_NAME
localhost,process_a,monitor_a
localhost,process_a,monitor_b
localhost,process_b,monitor_a
localhost,process_b,monitor_b
remote_host,process_a,monitor_a

Global config

Outside of the query blocks, you can make configuration settings that apply to all queries defined. These control the overall behaviour of the Data Server. Some can optionally be overridden at the query level

Here is an example of some configuration settings:

dataServer {
config {
lmdbAllocateSize = 512.MEGA_BYTE() // top level only setting
// Items below can be overridden in individual query definitions
compression = true
chunkLargeMessages = true
defaultStringSize = 40
batchingPeriod = 500
linearScan = true
excludedEmitters = listOf("PurgeTables")
enableTypeAwareCriteriaEvaluator = true
serializationType = SerializationType.KRYO // Available from version 7.0.0 onwards
serializationBufferMode = SerializationBufferMode.ON_HEAP // Available from version 7.0.0 onwards
directBufferSize = 8096 // Available from version 7.0.0 onwards
}
query("SIMPLE_QUERY", SIMPLE_TABLE) {
config {
// Items below only available in query level config
defaultCriteria = "SIMPLE_PRICE > 0"
backwardsJoins = false
disableAuthUpdates = false
}
}
}
Config itemDescriptionDefaultOverridable in query
lmdbAllocateSizeThis sets the size of the memory-mapped file where the in-memory cache stores the Data Server query rows. Value must be defined in MEGA_BYTE (512.MEGA_BYTE()) or GIGA_BYTE (2.GIGA_BYTE())2.GIGA_BYTE()No
serializationTypeSerialization approach to convert query rows into bytes and vice versa. Two options: SerializationType.KRYO and SerializationType.FURY. KRYO uses the Kryo library. FURY uses the Fury library instead. Internal tests show that Fury can serialize and deserialize row objects 13-15 times more quickly than Kryo in our current Data Server implementation; this leads to faster LMDB read/write operations (up to twice as quick in some cases). This performance improvement has a great impact on the latency incurred when requesting rows from a Data Server query, whether it happens as part of the first subscription message (DATA_LOGON messages) or subsequent row request messages (MORE_ROWS messages). However, the serialized byte array size of a Fury object can be between 10 and 15 per cent larger than a Kryo object, so there is a small penalty to pay for using this option.KRYONo
serializationBufferModeThis option changes the buffer type used to read/write information from/to LMDB. Two options: SerializationBufferMode. ON_HEAP and SerializationBufferMode.OFF_HEAP. ON_HEAP uses a custom cache of Java HeapByteBuffer objects that exist within the JVM addressable space. OFF_HEAP uses DirectByteBuffer objects instead, so memory can be addressed directly to access LMDB buffers natively in order to write and read data. OFF_HEAP buffers permit zero-copy serialization/deserialization to LMDB; buffers of this type are generally more efficient as they allow Java to exchange bytes directly with the operating system for I/O operations. To use OFF_HEAP, you must specify the buffer size in advance (see directBufferSize setting); consider this carefully if you do not know the size of the serialized query row object in advance.ON_HEAPNo
directBufferSizeSets the buffer size in bytes used by the serializationBufferMode option when the SerializationBufferMode.OFF_HEAP8096 bytesNo
compressionWhen true, it will compress the query row data before writing it to the in-memory cachefalseYes
chunkLargeMessagesWhen true, it will split large updates into smaller ones.falseYes
defaultStringSizeThis is the size to be used for string storage in the Data Server in-memory cache. Higher values lead to higher memory use; lower values lead to lower memory use, which can lead to string overflow. See the onStringOverflow setting for details of how the overflows are handled.40Yes
batchingPeriodDelay in milliseconds to wait before sending new data to Data Server clients500msYes
linearScanEnables/disables linear scan behaviour in the query definition. If false, it will reject criteria expressions that don't hit defined indexestrueYes
excludedEmittersEnables/Disables update filtering for a list of process names. Any database updates that originate from one of these processes will be ignored.Empty listYes
onStringOverflowControls how the system responds to a string overflow. A string overflow happens when the value of a String field in an index is larger than defaultStringSize, or the size set on the field.Two options are IGNORE_ROW rows with string overflows will be ignored, this can lead to data missing from the Data Server, or TRUNCATE_FIELD indices with string overflows will be truncated. The data with the overflow will still be returned in full, and will be searchable. However, if multiple rows are truncated to the same value, any subsequent rows will lead to duplicate index exceptions during the insert, so these rows will not be available to the Data Server.TRUNCATE_FIELDYes
enableTypeAwareCriteriaEvaluatorThis enables the type-aware criteria evaluator. The type-aware criteria evaluator can automatically convert criteria comparisons that don't match the original type of the Data Server field; these can still be useful to end users. For example, you might want a front-end client to perform a criteria search on a TRADE_DATE field like this: TRADE_DATE > '2015-03-01' && TRADE_DATE < '2015-03-02'. This search can be translated automatically to the right field types internally (even though TRADE_DATE is a field of type DateTime). The Genesis index search mechanism can also identify the appropriate search intervals in order to provide an optimized experience. The type-aware evaluator can transform strings to integers, and any other sensible and possible conversion (e.g TRADE_ID == '1'). As a side note, this type-aware evaluator is also available in DbMon for operations like search and qsearch. By contrast, the traditional criteria evaluator needs the field types to match the query fields in the Data Server. So the same comparison using the default criteria evaluator for TRADE_DATE would be something like: TRADE_DATE > new DateTime(1425168000000) && TRADE_DATE < new DateTime(1425254400000). This approach is less intuitive and won't work with our automatic index selection mechanism. In this case, you should use our common date expressions (more at Advanced technical details) to handle date searches.falseYes

Backward joins

Each query in a Data Server creates what is effectively an open connection between each requesting user and the Data Server. After the initial send of all the data, the Data Server sends only modifications, deletions and insertions in real time. Unchanged fields are not sent, which avoids unnecessary traffic.

By default, only updates to the root table are sent. So if you need to see updates to fields from a joined table, then you must specifically declare a backwards join. For queries with backwards joins enabled, the primary table and all joined tables and views are actively monitored. Any changes to these tables and views are sent automatically.

Monitoring of backwards joins can come at a cost, as it can cause significant extra processing. Do not use backwards joins unless there is a need for the data in question to be updated in real time. For example, counterparty data, if changed, can wait until overnight. The rule of thumb is that you should only use backwards joins where the underlying data is being updated intra-day.

The backwards join flag is true by default. You must explicitly set backwardsJoins = false if you wish to turn backwards joins off for your query.

Logging

Genesis provides a class called LOG that can be used to insert custom log messages. This class provides you with 5 methods to be used accordingly: info, warn, error,trade,debug. To use these methods, you need to provide, as an input, a string that will be inserted into the Logs.

Here is an example where you can insert an info message into the log file:

LOG.info("This is an info message")

The LOG instance is a Logger from SLF4J.

note

In order to see these messages, you must set the logging level accordingly. You can do this using the logLevel command.

Optimistic concurrency

Find details around how data servers work with optimistic concurrency

Client API

Client's can connect and query via Websocket or REST HTTP calls. Websockets are required for real-time updates to be pushed.

tip

To use the API, clients must first authenticate to obtain a SESSION_AUTH_TOKEN by calling EVENT_LOGIN_AUTH.

DATA_LOGON

To initiate a stream from a Data Server, the client sends a DATA_LOGON message.

To control the flow of data and allow for filtering and ordering, the following options can be included in the DATA_LOGON message within the DETAILS block.

OptionDefaultDescription
MAX_ROWS250Maximum number of rows to be returned as part of the initial message, and as part of any additional; read more below
MAX_VIEW1000Maximum number of rows to track as part of a front-end view; see below for more; read more below
MOVING_VIEWtrueDefines the behaviour of the front-end view when it receives new rows in real time. If MOVING_VIEW is set to true, and MAX_VIEW is reached, any new rows arriving from the query will start replacing the oldest rows in the view. This guarantees that only the most recent rows are shown by default; read more below
CRITERIA_MATCHThe front end can send an expression or a Groovy expression to perform filtering on the query server; these remain active for the life of the subscription. For example: Expr.dateIsBefore(TRADE_DATE,'20150518') or QUANTITY > 10000; read more below
FIELDSThis parameter enables the front end to select a subset of fields from the query. Example: TRADE_ID QUANTITY PRICE INSTRUMENT_ID. If this option is not specified, all fields are returned
ORDER_BYThis option can be used to select an index in the Data Server query that is being queried for sorting. By default, Data Server rows will be returned in the order they were created (from oldest database record to newest)
REVERSEfalseReverses the order of data served by the the ORDER_BY index. For example, if you are using the default index, the query will return rows from newest database records to oldest

Example DATA_LOGON

Request

{
"MESSAGE_TYPE" : "DATA_LOGON",
"SESSION_AUTH_TOKEN" : "SIlvjfUpkfXtGxfiQk1DDvVRlojKtNIM",
"SOURCE_REF" : "123",
"USER_NAME": "admin",
"DETAILS" : {
"DATASOURCE_NAME": "ALL_TRADES",
"CRITERIA_MATCH": "QUANTITY > 200",
"MOVING_VIEW": true,
"FIELDS": "TRADE_ID DIRECTION QUANTITY PRICE INSTRUMENT_NAME DATE",
"MAX_ROWS": 10
}
}

Response

{
MESSAGE_TYPE : "LOGON_ACK",
SEQUENCE_ID : 0
SOURCE_REF : "123"
}

Initial Response

{
"MESSAGE_TYPE": "QUERY_UPDATE",
"ROWS_COUNT": 2,
"ROW": [
{
"TRADE_ID": 1,
"DIRECTION": "BUY",
"QUANTITY": 500,
"PRICE": 226.12,
"INSTRUMENT_NAME": "Apple",
"DATE": 1730937600000
"DETAILS": {
"OPERATION": "INSERT",
"ROW_REF": "7169357821048261343"
}
},
"TRADE_ID": 2,
"DIRECTION": "SELL",
"QUANTITY": 1000,
"PRICE": 589.34,
"INSTRUMENT_NAME": "Meta",
"DATE": 1730937600000
"DETAILS": {
"OPERATION": "INSERT",
"ROW_REF": "7169357821048261344"
}
}
],
"MORE_ROWS": true,
"SOURCE_REF": "123",
"SEQUENCE_ID": 1
}

Subsequent responses are pushed as data is updated

New row update

{
"MESSAGE_TYPE": "QUERY_UPDATE",
"ROW": [
{
"TRADE_ID": 3,
"DIRECTION": "BUY",
"QUANTITY": 1000,
"PRICE": 795.04,
"INSTRUMENT_NAME": "Netflix",
"DATE": 1730937600000
"DETAILS": {
"OPERATION": "INSERT",
"ROW_REF": "7169357821048261345"
}
},
],
"SOURCE_REF": "123",
"SEQUENCE_ID": 2
}

Row modification update (price change for TRADE_ID 3)

{
"MESSAGE_TYPE": "QUERY_UPDATE",
"ROW": [
{
"PRICE": 227.01,
"DETAILS": {
"OPERATION": "MODIFY",
"ROW_REF": "7169357821048261343"
}
},
],
"SOURCE_REF": "123",
"SEQUENCE_ID": 3
}

Rows & view concepts

MAX_ROWS

MAX_ROWS determines how many rows will be sent when the front end requests data. It also determines how many rows are sent when the front end makes a MORE_ROWS request.

The rows are queried based on the REVERSE setting (which only applies to non-real-time updates). So it is important to note the following behaviour.

For example, when a DATA_LOGON is received with MAX_ROWS set to 500, and REVERSE set to false:

  • If no real-time updates occur on the server the initial DATA_LOGON returns all 500 rows in reverse order. If the front end makes another request for MAX_ROWS, it receives another 500 rows in reverse order. From here, the front end can make further MORE_ROWS requests until the MAX_VIEW setting is reached.

  • If real-time updates occur on the server these are sent to the front end regardless of order, and they will count towards the MAX_VIEW.

MAX_VIEW and MOVING_VIEW

What happens when you reach MAX_VIEW depends on the MOVING_VIEW setting.

  • If MOVING_VIEW is set to true, the Data Server will start discarding the oldest rows (in terms of timestamp) and sending newer rows. So if you already have 2000 rows in your view and a new row is inserted, the oldest row in your view will be deleted (the Data Server sends a delete message) and the new row will be sent (the Data Server sends an insert message for that row).

  • If MOVING_VIEW is set to false, the Data Server will not send any updates for any new rows whatsoever. Only the rows that are currently in your view will receive updates. In either case, you can only have a total of MAX_VIEW rows in your grid.

This allows for easier implementation of infinite scrolling, whilst providing latest and greatest database updates. The front end must send a DATA_LOGON and then continue calling MORE_ROWS to fill the grid as the user scrolls.

Limiting the number of rows

Where the front end wants to limit the number of rows returned by the query, and only wants updates for those specific rows, there are two options.

For both options that we describe below, we assume that the front end wants to receive only five lines:

  • The DATA_LOGON will have MAX_ROWS = 5, MAX_VIEW = 5, and MOVING_VIEW = false. This ensures that the server only returns those those rows and only sends updates for those rows - updates to other rows are ignored.
  • If you want to use pagination, the front end requests a limited number of lines; for this, the DATA_LOGON will have MAX_ROWS = 5 and VIEW_NUMBER = 1. The first five rows are sent in a pagination mode. A subsequent MORE_ROWS update with VIEW_NUMBER = 2 will delete all the current five rows in the grid and return the next page of rows (i.e. another 5). The server only sends updates for the rows on display, and will respect REVERSE settings.
    On successful DATA_LOGON and on any subsequent MORE_ROWS messages, the server sends a field called ROWS_COUNT, which contains an estimated number of rows in the server. You can divide this number by your MAX_ROWS setting to know how many pages can be queried on the server.
info

Let's summarize all that.

  • Where the front end only wants to receive a subset of rows, or is using pagination, DATA_LOGON has MAX_ROWS = MAX_VIEW and MOVING_VIEW = false.
  • If you always want to send new updates to the front end automatically once they occur in the server, the DATA_LOGONs from the front end do not have MAX_ROWS = MAX_VIEW or MOVING_VIEW set to false. The value of REVERSE is ignored.

CRITERIA_MATCH

Client filtering is specified in the CRITERIA_MATCH option.

The filters can be specified using common expressions, Groovy expressions, or even a mix of the two. See more detail in Client capabilities - server communications.

MORE_ROWS

Used to request more rows on an active subscription.

TBD

MORE_COLUMNS

Used to request more columns on an active subscription.

Request

{
"MESSAGE_TYPE": "MORE_COLUMNS",
"SOURCE_REF": "124",
"SESSION_AUTH_TOKEN": "SIlvjfUpkfXtGxfiQk1DDvVRlojKtNIM",
"USER_NAME": "admin",
"DETAILS": {
"ADD_COLUMNS": "CTPTY_NAME"
}
}

ACK Response

{
"MESSAGE_TYPE": "MORE_COLUMNS_ACK",
"SOURCE_REF": "124"
}

Response with the new column data

{
"MESSAGE_TYPE": "QUERY_UPDATE",
"ROW": [
{
"COUNTERPARTY_NAME": "Genesis",
"DETAILS": {
"OPERATION": "MODIFY",
"ROW_REF": "7169357821048261343"
}
},
{
"COUNTERPARTY_NAME": "Genesis",
"DETAILS": {
"OPERATION": "MODIFY",
"ROW_REF": "7169357821048261344"
}
},
{
"COUNTERPARTY_NAME": "Genesis",
"DETAILS": {
"OPERATION": "MODIFY",
"ROW_REF": "7169357821048261345"
}
},
],
"SOURCE_REF": "123",
"SEQUENCE_ID": 5
}

DATA_LOGOFF

This will end a dataserver subscription.

Request

{
"MESSAGE_TYPE": "DATA_LOGOFF",
"SOURCE_REF": "125",
"USER_NAME": "admin"
}

Response

{
"MESSAGE_TYPE": "LOGOFF_ACK",
"SOURCE_REF": "125"
}

Metrics

info

Ensure you have enabled metrics in your environment to view them.

The data server metrics expose a number of metrics to help monitor the service. These metrics expose the number of users connected, and various latency statistics.

Data Servers provide real-time views to users, so each connection to a query has a resource cost. This is not a problem under normal usage, where connections are managed by the Genesis framework. The Genesis client automatically releases these connections when they are no longer required. However, if a bespoke client is used, then these connections might not be closed properly.

A high user or connection count can indicate a problem here.

The latency metrics track how well a Data Server is handling requests.

MetricExplanation
user_countThe number of users connected to a query
connection_countThe number of connections to a query
data_logon.processing_latencyThe latency for processing DATA_LOGON request
enriched_lookup_latencyThe latency for enriching data
lookup_latencyThe latency for a look-up
message_publication_latencyThe latency for publishing an update

Runtime configuration

To include your *-dataserver.kts file definitions in a runtime process, you will need to ensure the process definition:

  1. Ensure genesis-pal-dataserver is included in module
  2. Ensure global.genesis.dataserver.pal is included in package
  3. Ensure your dataserver.kts file(s) are defined in script
  4. Ensure pal is set in language

If you wish to run a dedicated process as a dataserver, the following gives an example full process definition:

  <process name="POSITION_DATASERVER">
<groupId>POSITION</groupId>
<start>true</start>
<options>-Xmx1024m -DXSD_VALIDATE=false</options>
<module>genesis-pal-dataserver</module>
<package>global.genesis.dataserver.pal</package>
<script>position-dataserver.kts</script>
<description>Displays real-time details</description>
<language>pal</language>
</process>

See runtime configuration - processes for further details.

Performance

If better database response performance is required, you may also consider added a cache to the process. See runtime configuration - process cache for more details.

Testing

info

GenesisJunit is only available from version 8 of the Genesis Server Framework (GSF).

If you are testing against an earlier version of the framework, go to the legacy section.

Integration testing

This document looks at the basics of testing Data Servers.

We shall use a very simple example and work through the communication between our tests and the Data Server we are testing. This example relies on GenesisJunit, which is designed to make testing easy.

Preparation

There are some simple steps to follow in order to set up your test.

1. Create the test class

Create the test class using the code provided below:

@ExtendWith(GenesisJunit::class)
@ScriptFile("positions-app-tutorial-dataserver.kts")
@CsvData("seed-data.csv")
class DataServerTest {

// our tests go here ...
}

This test class does three things:

  • It enables GenesisJunit.
  • It identifies the Data Server script that we want to test, using the ScriptFile annotation.
  • It identifies the source data for our test, using the CsvData annotation.

There is more information about GenesisJunit and the various annotations in the section on Integration testing.

2. Load test data

Create a CSV file called seed-data.csv in the root of the resources folder for your tests. Add the data below, which contains three records, to this file.

#POSITION
POSITION_ID,INSTRUMENT_ID,QUANTITY,NOTIONAL,VALUE,PNL
000000000000001PSLO1,INSTRUMENT_TEST1,1,1111.11,1111.11,1111.11
000000000000001PSLO2,INSTRUMENT_TEST2,1,2222.22,2222.22,2222.22
000000000000001PSLO3,INSTRUMENT_TEST3,1,3333.33,3333.33,3333.33
3. Inject references

Before we can begin writing tests for our Data Server, we need to inject references to the database and the Data Server client:

@ExtendWith(GenesisJunit::class)
@ScriptFile("positions-app-tutorial-dataserver.kts")
@CsvData("seed-data.csv")
class DataServerTest {

@Inject
private lateinit var db: SyncEntityDb

@Inject
private lateinit var dataServerClient: DataServerQueryViewClient

// our test will go here ...
}

A first test

The test below is designed to make sure that a Data Server returns the right number of records.

@Test
fun testViewHas3Records() {
val view = dataServerClient.queryView<Position>(
datasourceName = "ALL_POSITIONS"
)

await untilCallTo { view } has { size == 3 }
}

As you can see here, to connect to our Data Server, you need to provide the datasource name. We have also provided the type or class that the query returns. This is optional; if no type is provided, the view will be of type GenesisSet.

For Java developers, we provide a fluent interface. In Kotlin, you can use named parameters. The test is the same regardless.

Once you connect, the client will provide a QueryView, which connects to the resource in the Data Server and receives updates.

At this point, we are just making sure we have received 3 records.

Create collections from QueryView

In addition to the size/getSize() property, there are also utility methods to represent the view as a collection. These methods take a snapshot, and are not updated.

@Test
fun buildCollections() {
val view = dataServerClient.queryView<Position>(
datasourceName = "ALL_POSITIONS"
)

val list = view.toList();
val byId = view.associateBy { it.positionId };
val byCurrency = view.groupBy { it.currency };
}

Dynamic authorization

For this test, we have amended our Data Server to enable authorization:

dataServer {
query("ALL_TRADES_AUTH", TRADE) {
permissioning {
auth("TRADE_VISIBILITY") {
authKey {
key(data.tradeId)
}
}
}
}
}

Once you add the auth block to the code, you need to amend the test.

To test dynamic authorization, add @EnableInMemoryTestAuthCache to your class or method. This makes InMemoryTestAuthCache available for injection in your test class.

@ExtendWith(GenesisJunit::class)
@EnableInMemoryTestAuthCache
@ScriptFile("hello-world-dataserver.kts")
@CsvData("SystemTest/simple-data-load.csv")
class DataServerTest {

@Inject
lateinit var client: DataServerQueryViewClient

@Inject
lateinit var authCache: InMemoryTestAuthCache

@Test
fun testGetTradesDynamicAuth() {
val view = client.queryView<Trade>(
datasourceName = "ALL_TRADES_AUTH",
userName = "JohnDoe"
)


authCache.authorise(
authMap = "TRADE_VISIBILITY",
entityCode = "00000000001TRSP0",
userName = "JohnDoe"
)

await untilCallTo { view } has { size == 1 }

authCache.authorise(
authMap = "TRADE_VISIBILITY",
entityCode = "00000000002TRSP0",
userName = "JohnDoe"
)

await untilCallTo { view } has { size == 2 }

authCache.revoke(
authMap = "TRADE_VISIBILITY",
entityCode = "00000000001TRSP0",
userName = "JohnDoe"
)

authCache.revoke(
authMap = "TRADE_VISIBILITY",
entityCode = "00000000002TRSP0",
userName = "JohnDoe"
)

await untilCallTo { view } has { size == 0 }
}
}

Await methods

Finally, QueryView has some methods to help with making assertions on the data. With the view itself being updated in the background, these methods will have a timeout.

@Test
fun testUsdPositions() {
val view = dataServerClient.queryView<Position>(
datasourceName = "USD_POSITIONS"
)

val list = db.getRange(Position.byCurrencyId("USD"))

view.awaitUntilEqual(list)
view.awaitUntilContains(list.first())
}

Integration testing (legacy)

info

This section covers testing your Data Server if you are using any version of the Genesis Server Framework before GSF v8.

The Genesis platform provides the AbstractGenesisTestSupport abstract class, which enables end-to-end testing of specific areas of your application.

In this case, we want to ensure that we have a database seeded with information. And we want to check that our Data Server configuration is used to create our Data Server. We also need to add the required packages and genesis home.

class DataServerTests : AbstractGenesisTestSupport<Reply<*>>(
GenesisTestConfig {
addPackageName("global.genesis.dataserver.pal")
genesisHome = "/GenesisHome/"
scriptFileName = "positions-app-tutorial-dataserver.kts"
initialDataFile = "seed-data.csv"
}
) {

private var ackReceived = false
private var initialData: GenesisSet = GenesisSet()
private var updateData: GenesisSet = GenesisSet()

@Before
fun before() {
ackReceived = false
initialData = GenesisSet()
updateData = GenesisSet()

messageClient.handler.addListener { set, _ ->
println(set)
if ("LOGON_ACK" == set.getString(MessageType.MESSAGE_TYPE)) {
ackReceived = true
}
if ("QUERY_UPDATE" == set.getString(MessageType.MESSAGE_TYPE)) {
if (initialData.isEmpty) {
initialData = set
} else {
updateData = set
}
}
}
}

For more information about AbstractGenesisTestSupport, see the Testing pages.

Let's load some test data into the seed-data.csv in the root of the resources folder for your tests.

#POSITION
POSITION_ID,INSTRUMENT_ID,QUANTITY,NOTIONAL,VALUE,PNL
000000000000001PSLO1,INSTRUMENT_TEST1,1,1111.11,1111.11,1111.11
000000000000001PSLO2,INSTRUMENT_TEST2,1,2222.22,2222.22,2222.22
000000000000001PSLO3,INSTRUMENT_TEST3,1,3333.33,3333.33,3333.33

We are now ready to begin writing tests for our Data Server.

First, we send a DATA_LOGON message to our Data Server; our Data Server should then respond with the current snapshot of data. Any changes will then be automatically sent to us as a consumer.

We then need to trigger a change on the database. Our Data Server will see this change and send the update to us.

@Test
fun `data server passes update`(): Unit = runBlocking {
messageClient.sendMessage(
GenesisSet.genesisSet {
MessageType.MESSAGE_TYPE with "DATA_LOGON"
MessageType.DETAILS with GenesisSet.genesisSet {
MessageType.DATASOURCE_NAME with "ALL_POSITIONS"
}
}
)
Awaitility.waitAtMost(30, TimeUnit.SECONDS).until(ackAndDataReceived())

// We pull out the initial data
val rows = initialData.getArray<GenesisSet>("ROW")!!.sortedBy { it?.getString("POSITION_ID") }
val firstRow = rows.first() ?: fail("Missing first row in initial data")
val firstRowRef = firstRow.getGenesisSet("DETAILS")!!.getString("ROW_REF")!!
assertEquals(3, rows.size)
assertEquals("000000000000001PSLO1", firstRow.getString("POSITION_ID")!!)
assertEquals(1111.11, firstRow.getDouble("VALUE")!!)

// We update the database to trigger our Data Server into action
entityDb.updateBy(Position.byId("000000000000001PSLO1")) {
value = 1234.56
}
Awaitility.await().until(ackDataAndUpdateReceived())

// We consume the update
val updateRows = updateData.getArray<GenesisSet>("ROW")!!
val updateRow = updateRows.first()!!

assertEquals(1, updateRows.size)
val updateRowRef = firstRow.getGenesisSet("DETAILS")?.getString("ROW_REF")!!
assertEquals(firstRowRef, updateRowRef)
assertEquals(1234.56, updateRow.getDouble("VALUE")!!)
}

/**
* Check ack and initial data received.
*
* @return true if ack and initial data received
*/
private fun ackAndDataReceived(): Callable<Boolean>? {
return Callable { ackReceived && !initialData.isEmpty }
}
private fun ackDataAndUpdateReceived(): Callable<Boolean> {
return Callable { ackReceived && !initialData.isEmpty && !updateData.isEmpty }
}