Skip to main content

Streamer

Overview

Streamers provide reliable data streams to streamer clients based on a database table or view. These data streams are reliable as they are driven by an underlying unique, sequenced and monotonically increasing field found in the table or view itself.

The core streamer behaviour is the following:

  • Streamer receives a logon message from a streamer client with or without a specific sequence number, for a specific streamer definition.
  • The streamer will act differently depending on whether it received a sequence number or not.
    • If no sequence number has been received, the streamer will mark the streamer client connection as up to date.
    • If a sequence number has been received, the streamer will use it to read a batch of records (see batchSize) from the database using the defined table index. These records will be sent to the streamer client with an additional flag indicating whether there is more data available in the table or not.
      • If more data is available, the streamer client will request the next batch of data once the message is processed. This repeats the steps explained in the previous point, albeit no sequence number is needed, as the streamer knows how to read the next batch of records based on the previous batch.
      • If no more data is available, the streamer marks the streamer client connection as up to date.
    • Once the connection is marked as up to date, the streamer starts streaming real-time updates for any new inserted records belonging to the target table/view. Important only insert updates are eligible for streamers, as we assume the underlying table represents a sequence of messages or record changes that work in an "append-only" fashion.
    • The data can be transformed and filtered before being passed on to the streamer client.
warning

Streamers can only work as intended by using a numeric field in the underlying table. This field must represent a numeric sequence (perhaps defined as an auto increment field) and be both unique and monotonically increasing. This field must also be indexed.

Additionally, streamers may use xlator plugins that enhance the available streamer configuration with new configuration blocks and data transformations. See the FIX xlator documentation here for more information.

Example configuration

A sample streamer configuration can be seen below:

streams {
stream("FIX_IN_X", FIX_IN.BY_RX_SEQUENCE) {
batchSize = 500

fields {
FIX_IN.FIX_DATA
}

where { fixIn ->
fixIn.connectionName == "X"
}
}
}

And the FIX_IN table is defined as seen below:

    table(name = "FIX_IN", id = 7003) {
field(name = "CONNECTION_NAME", type = STRING)
field(name = "RX_SEQUENCE", type = LONG)
field(name = "SENDER_ID", type = STRING)
field(name = "TARGET_ID", type = STRING)
field(name = "CL_ORD_ID", type = STRING)
field(name = "ORIG_CL_ORD_ID", type = STRING)
field(name = "INTERNAL_TARGET", type = STRING)
field(name = "SIZE", type = INT)
field(name = "FIX_DATA", type = STRING)
primaryKey("RX_SEQUENCE")
indices {
unique("CONNECTION_NAME", "RX_SEQUENCE")
}
}

In this case we are defining a stream named "FIX_IN_X", using the FIX_IN table and the BY_RX_SEQUENCE index. The BY_RX_SEQUENCE index contains a single field named RX_SEQUENCE.

The batchSize configuration has been set to 500, so the streamer will only read 500 records at a time when replaying records.

The fields configuration has been defined to only extract the FIX_DATA field from the FIX_IN table, so no other fields will be provided to any streamer clients connecting to this streamer. In this case, FIX_DATA contains a string represenation of the whole FIX message.

The where configuration filters all data to ensure we only send records with CONNECTION_NAME equals to "X".

See sample workflow diagram below interactions between a hypothetical streamer client connecting to the streamer defined in our example:

Configuration options

stream

You can use the stream configuration to declare a reliable stream.

The simplest Streamer definition is:

streams {
stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP)
}

This example creates a stream called ORDER_OUT, based on the ORDER_OUT table (or view). The data will be streamed, ordered by timestamp.

stream contains additional configuration items explained below.

batchSize

batchSize determines how many records will be read from the database in each query when the streamer is replaying data after a successful streamer client connection.

The default value is 100.

Example usage below:

streams {
stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP){
batchSize = 500
}
}

logoffTimeout

logoffTimeout determines how often the streamer will review its current streamer client connections to check if they are still alive. If a connection is not alive anymore, it will be closed. The value is measured in seconds.

The default value is 5000.

Example usage below:

streams {
stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP){
logoffTimeout = 60
}
}

maxLogons

maxLogons determines how many streamer client connections can be established to this particular streamer. This ensures no more connections than absolutely necessary are allowed.

The default value is 1.

Example usage below:

streams {
stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP){
maxLogons = 2
}
}

terminateOnError

terminateOnError determines the behaviour of the streamer client when an exception is thrown. For example, in the sample code below we apply a transformation to the outgoing GenesisSet message that will throw an exception:

streams {
stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP) {
toGenesisSet { ordersOut ->
throw RuntimeException()
}
}
}

By default terminateOnError is set to true, which means the streamer process will crash as soon as the first record hits the toGenesisSet transformation. If set to false, the exception will be logged, but the streamer will continue process rows.

where

The where tag enables the stream to be filtered. It is available in two versions: one that has the streamed row as a parameter, and one that also has the logon message.

Here, we only stream orders with a quantity greater than 1,000.

streams {
stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP) {
where { ordersOut ->
ordersOut.quantity > 1_000
}
}
}

In this example, we only stream orders with a quantity greater than 1,000 and where the logon message has provided a secret key.

streams {
stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP) {
where { ordersOut, logonMessage ->
ordersOut.quantity > 1_000 && logonMessage.getString("KEY") == "SECRET"
}
}
}

fields

The fields tag enables you to transform the message output in a similar way to views, data server and req rep definitions. For example, here we output three fields:

streams {
stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP) {
fields {
ORDER_OUT.CLIENT_ID
ORDER_OUT.QUANTITY withPrefix "ORDER"
ORDER_OUT.CLIENT_ID withAlias "CLIENT"
}
}
}

You can override the name of a field using various operators, this is necessary in the case a field name is the same as another table's field name.

  • withAlias <NAME> gives the field an alternative NAME
  • withPrefix adds a prefix to the field name

withFormat <FORMAT_MASK> can also be used to override DATE, DATETIME and numerical fields to be sent as a String in a particular format by using format masks.

toGenesisSet

The toGenesisSet tag enables you to create a custom GenesisSet from the table/view entity before it is automatically converted and sent to the streamer client.

Example below:

streams {
stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP) {
toGenesisSet { ordersOut ->
genesisSet {
"ORDER_QUANTITY" with ordersOut.quantity
"ORDER" with ordersOut.orderId
}
}
}
}

toEntity

The toEntity tag allows you to create a custom DbEntity from the table/view entity before it is automatically converted and sent to the streamer client as a GenesisSet.

Example below:

streams {
stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP) {
toEntity { ordersOut ->
Order {
quantity = ordersOut.quantity
orderId = ordersOut.orderId
}
}
}
}

Custom Log messages

Genesis provides a class called LOG that can be used to insert custom log messages. This class provides you with 5 methods to be used accordingly: info, warn, error,trade,debug. To use these methods, you need to provide, as an input, a string that will be inserted into the Logs.

Here is an example where you can insert an info message into the log file:

LOG.info("This is an info message")

The LOG instance is a Logger from SLF4J.

note

In order to see these messages, you must set the logging level accordingly. You can do this using the logLevel command.

Metrics

info

Ensure you have enabled metrics in your environment to view them.

The metrics for the Streamer measure how long it takes to replay a single message batch when working in recovery mode:

MetricExplanation
replay_processing_latencyThe latency for processing a replay batch

Runtime configuration

To include your *-streamer.kts file definitions in a runtime process, you will need to ensure the process definition:

  1. Ensure genesis-pal-streamer is included in module
  2. Ensure global.genesis.streamer.pal is included in package
  3. Ensure your streamer.kts file(s) are defined in script
  4. Ensure pal is set in language

If you wish to run a dedicated process for a streamer, the following gives an example full process definition:

<process name="POSITION_STREAMER">
<groupId>POSITION</groupId>
<description>Streams trades to external FIX gateway</description>
<start>true</start>
<options>-Xmx256m -DRedirectStreamsToLog=true -DXSD_VALIDATE=false -XX:MaxHeapFreeRatio=70 -XX:MinHeapFreeRatio=30 -XX:+UseG1GC -XX:+UseStringDeduplication -XX:OnOutOfMemoryError="handleOutOfMemoryError.sh %p"</options>
<module>genesis-pal-streamer</module>
<package>global.genesis.streamer.pal</package>
<primaryOnly>true</primaryOnly>
<script>position-streamer.kts</script>
<loggingLevel>INFO,DATADUMP_ON</loggingLevel>
<language>pal</language>
</process>

See runtime configuration - processes for further details.

Performance

If better database response performance is required, you may also consider added a cache to the process. See runtime configuration - process cache for more details.