Streamer client
Overview
Streamer clients link data streams from streamers to a target event handler based on their configuration. This linkage takes into account whether the data has been processed by the event handler in order to provide recoverability in case of failure.
The core streamer client behaviour is the following:
- Streamer client sends a logon message to a streamer with or without a specific sequence number, for a specific streamer definition.
- The streamer client will now start receiving messages from the streamer, either as part of a recovery message workflow, or real-time messages.
- If the message received contains a flag named
MORE_DATA_AVAILABLE
set to true, the streamer client will send a message to the streamer requesting the next batch of data once it has finished processing the message. - The previous step will be repeated as long as the
MORE_DATA_AVAILABLE
is set to true in the incoming message from the streamer.
- If the message received contains a flag named
- Once
MORE_DATA_AVAILABLE
is received asfalse
, the streamer client will stop requesting new batches of data and just process real-time messages as they come. - These messages can be transformed and filtered before being passed on to the event handler.
- Streamer clients also have a backpressure mechanism to avoid overloading the event handler with too many messages.
- For each message sent to an event handler, an internal counter is increased by one. This counter represented the currently unacknowledged messages.
- The expectation is that each message will receive an acknowledgement from the event handler. For each acknowledgement, the internal counter is decreased by one.
- The
eventHandlerBuffer
configuration item determines the maximum number of unacknowledged messages allowed on a streamer client configuration. - If the
eventHandlerBuffer
number is reached, the streamer client will assume the event handler is overloaded, and stop sending messages until the number is decreased. Any messages received from the streamer will be queued in the meantime.
- The streamer client data recovery mechanism is handled by an internal file cache as well as a database record stored in the
PROCESS_REF
table.- On a failure event, the streamer client will read from the file cache first (and table second if cache is not available) to determine what was the least recent non-acknowledged message sequence number.
- If there are no unacknowledged messages, it will pick the latest acknowledged sequence number.
- Then it will use this sequence number to request data from the streamer. This way it ensures any records after the last processed sequence will be streamed to the event handler.
- More information about sequence numbers and how they are used can be found in the streamer page.
Streamer client processes should run in primaryOnly
mode to avoid sending a duplicated message stream to an event handler.
Streamer client processes may replay messages that have been sent to an event handler previously in event of failure. Event handler implementations must take this into account and build duplicate message protection accordingly.
Additionally, streamer clients may use xlator
plugins that enhance the available streamer configuration with new configuration blocks and data transformations. See the FIX xlator documentation here for more information.
Example configuration
A sample streamer client configuration can be seen below:
streamerClients {
streamerClient(clientName = "ORDER_OUT") {
eventHandlerBuffer = 500
dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT")
onMessage {
send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT")
}
}
streamerClient(clientName = "FILTERED_QUOTE", selectOn = QUOTE.SYMBOL) {
dataSource(processName = "POSITION_STREAMER", sourceName = "QUOTE_STREAM")
onMessage(where = "VODL") {
sendFormatted(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_VODL_QUOTE") {
QUOTE.SYMBOL
QUOTE.PRICE withAlias "BID_PRICE"
}
}
onMessage(where = "MSFT") {
sendFormatted(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_MSFT_QUOTE") {
QUOTE.SYMBOL
QUOTE.SECURITY_EXCHANGE withAlias "EXCHANGE"
QUOTE.PRICE withAlias "BID_PRICE"
}
}
}
}
And the QUOTE table is defined as seen below:
table(name = "QUOTE", id = 7005) {
field(name = "SYMBOL", type = STRING)
field(name = "SECURITY_EXCHANGE", type = STRING)
field(name = "PRICE", type = DOUBLE)
field(Name = "QUOTE_DATETIME", type = DATETIME)
primaryKey("SYMBOL")
}
In the example we see two different streamer client definitions, one named "ORDER_OUT" and another one named "FILTERED_QUOTE".
The "ORDER_OUT" definition, is pretty simple:
- The
dataSource
configuration has two parametersprocessName
andsourceName
with the values "POSITION_EVENT_HANDLER" for the former and "EVENT_VODL_QUOTE" for the latter. These correspond to the streamer process name, and the stream name defined within the streamer we are connecting to. - The
eventHandlerBuffer
is configured to 500, so a maximum number of 500 events can ever be in flight as explained in the overview section. - The
onMessage
block defines the simplestsend
configuration to redirect the data received from the streamer to an event handler without any transformations or filtering. In this case, thetargetProcess
parameter refers to the event handler process name (POSITION_EVENT_HANDLER) and themessageType
parameter refers to the specific event handler endpoint we should route the message to.
In the case of the "FILTERED_QUOTE" definition, we see a few more configuration settings.
- The
streamerClient
has been defined with a name, but also aselectOn
field. This allows the streamer client to do specific logic based on one of the message fields. - In this case the
selectOn
parameter is using theSYMBOL
field of theQUOTE
table, as we know the incoming message from the streamer is actually equivalent to theQUOTE
table entity definition. - The
dataSource
configuration is similar to "ORDER_OUT", but in this case we are targeting a different streamer definition: "QUOTE_STREAM". - The
onMessage
blocks have awhere
parameter this time, and it is based on theselectOn
field: "SYMBOL".- In this configuration, we have two
onMessage
definitions to define different behaviour depending on whether the incoming "SYMBOL" value from a "QUOTE_STREAM" record is "MSFT" or "VODL". - In the case of "VODL", we use the
sendFormatted
configuration to target the "POSITION_EVENT_HANDLER" process and the "EVENT_VODL_QUOTE" event handler. Then the message is formatted to only send the values for SYMBOL and PRICE field. It is important to not that the field names can also be formatted, and in this case the PRICE field name is changed to be BID_PRICE using thewithAlias
configuration. - In the case of "MSFT", we use
sendFormatted
to target "POSITION_EVENT_HANDLER", but this time a different event handler: "EVENT_MSFT_QUOTE". The message itself is also formatted like the previous one, but we also include the field SECURITY_EXCHANGE and rename it as EXCHANGE.
- In this configuration, we have two
A sample workflow for the first streamer client definition can be seen below:
Configuration options
streamerClient
You can use the streamerClient
configuration to declare a reliable stream.
The simplest Streamer-client definition is:
streamerClients {
streamerClient(clientName = "ORDER_OUT") {
dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT")
onMessage {
send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT")
}
}
}
The clientName
parameter is used to uniquely identify a streamerClient
definition.
In more detail, there are three types of streamer client definitions, including the one seen above:
GenesisSet
streamer client. It uses the rawGenesisSet
message received from the streamer for transformation and configuration purposes.
// builds a GenesisSet based streamer client
streamerClient(clientName = "{name}") { ... }
- Table or View entity streamer client. It uses an existing
DbEntity
for type safe configuration. Most useful when the streamer is streamingDbEntity
objects.
// builds a type safe QUOTE streamer client
streamerClient(clientName = "{name}", source = QUOTES { ... }
- And finally, selective streamer clients. These type of streamerclient definitions combine a
selectOn
parameter with differentonMessage
blocks to ensure messages are handled differently depending on the selected field value. The example below enables you to handle VODL quotes one way and MSFT quotes in another:
streamerClient(clientName = "CLIENT", selectOn = QUOTE.SYMBOL) {
onMessage(where = "VODL") { ... }
onMessage(where = "MSFT") { ... }
}
In the case of GenesisSet
streamer clients, the field selection syntax can be one of the following:
// use the Fields object:
streamerClient(clientName = "CLIENT", selectionField = Fields.SYMBOL) { ... }
// specify a field and type
streamerClient(clientName = "CLIENT", selectionField = "SYMBOL", type = INTEGER) { ... }
// if no type is specified it will default to STRING
streamerClient(clientName = "CLIENT", selectionField = "SYMBOL") { ... }
dataSource
The dataSource
setting specifies what streamer process, and what stream within that process should be used as part of the streamerClient
definition.
The parameters are processName
and sourceName
to identify the streamer process name and stream name respectively.
Example usage below:
streamerClients {
streamerClient(clientName = "ORDER_OUT") {
dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT")
onMessage(subscriptionKey = "POSITION_EVENT_HANDLER") {
send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT")
}
onMessage(subscriptionKey = "POSITION_EVENT_HANDLER") {
send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT")
}
}
}
onMessage
The onMessage
configuration defines what the streamer client does with each streamer message. It has three operations: where
, send
and sendFormatted
.
-
When using non-selective
streamerClient
definitions,onMessage
may optionally have asubscriptionKey
parameter to identify eachonMessage
block. DifferentonMessage
blocks may have different transformations andwhere
configurations too. In those cases, thesubscriptionKey
is important, as a single streamer client definition may connect to multiple processes or multiple event handler endpoints with different formatting logic, sosubscriptionKey
allows for an easy way to identify eachonMessage
stream. By default,subscriptionKey
is equal to thestreamerClient
definitionclientName
parameter. -
In the case of selective
streamerClient
definitions, awhere
parameter must be provided, and optionally asubscriptionKey
parameter as well.subscriptionKey
is necessary for the same reason explained in the previous point and by default it will be equal to thewhere
parameter value.
Some examples for non-selective and selective onMessage
blocks can be seen below:
streamerClients {
streamerClient(clientName = "ORDER_OUT") {
eventHandlerBuffer = 500
dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT")
onMessage(subscriptionKey = "POSITION") {
send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT")
}
onMessage(subscriptionKey = "OMS") {
send(targetProcess = "OMS_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT")
}
}
streamerClient(clientName = "FILTERED_QUOTE", selectOn = QUOTE.SYMBOL) {
dataSource(processName = "POSITION_STREAMER", sourceName = "QUOTE_STREAM")
onMessage(where = "VODL") {
sendFormatted(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_VODL_QUOTE") {
QUOTE.SYMBOL
QUOTE.PRICE withAlias "BID_PRICE"
}
}
onMessage(where = "MSFT", subscriptionKey = "OMS") {
sendFormatted(targetProcess = "OMS_EVENT_HANDLER", messageType = "EVENT_MSFT_QUOTE") {
QUOTE.SYMBOL
QUOTE.SECURITY_EXCHANGE withAlias "EXCHANGE"
QUOTE.PRICE withAlias "BID_PRICE"
}
}
}
}
where
The where
configurations ensures only certain messages are sent to the event handler based on a predicate. This operation has one parameter corresponding to the streamer client message type. This can be:
- a table or view entity
- a GenesisSet
As this operation is a predicate, it must return a Boolean value.
Example below for a DbEntity
approach using the QUOTE
table:
where { quote ->
quote.price > 0.0
}
Another example this time using GenesisSet
:
where { quote ->
quote.getDouble("PRICE", 0.0) > 0.0
}
send
and sendFormatted
The send
configuration allows directs and optionally formats the outgoing message.
It requires two parameters targetProcess
and messageType
.
Each onMessage
block must have at least one send
block.
For example:
send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_QUOTE_INSERT")
This will send the full content of the streamer message on to the target.
In addition, for entity streamers, you can format the message in the same way as you would define the output of a view, data server, or request reply.
Use sendFormatted
in this scenario:
sendFormatted("POSITION_EVENT_HANDLER", "EVENT_QUOTE_INSERT") {
QUOTES.SYMBOL
QUOTES.PRICE withAlias "BID_PRICE"
}
You can override the name of a field using various operators, this is necessary in the case a field name is the same as another table's field name.
withAlias <NAME>
gives the field an alternativeNAME
withPrefix
adds a prefix to the field name
withFormat <FORMAT_MASK>
can also be used to override DATE
, DATETIME
and numerical fields to be sent as a String in a particular format by using format masks.
-
DATE
/DATETIME
format patterns should adhere to the Joda DateTime format -
numerical format patterns should adhere to the Java DecimalFormat
Finally, you can craft the message from scratch.
This example uses just the streamer message as a parameter, which may be a GenesisSet
or a DbEntity
as explained in the where
section:
send("POSITION_EVENT_HANDLER", "EVENT_QUOTE_INSERT") { quote ->
genesisSet {
"SYMBOL" with quote.symbol
"BID_PRICE" with quote.price
}
}
The example below uses the streamer message as a parameter (in this case a DbEntity
object for QUOTE
) and an optional GenesisSet
parameter that acts as the output object.
The GenesisSet
object will be empty and this method is just a convenient way of writing the same code as in the previous example, without having to create a new GenesisSet
object.
send("QUOTE_HANDLER", "QUOTE_EVENT") { quote, set ->
set.setString("SYMBOL", quote.symbol)
}
When using this send
example, you need to specify both parameters, (quote ->
or quote, set ->
). The default it
Kotlin parameter does not work in this case.
isReplayable
isReplayable
is a boolean flag that determines whether the stream should keep track of sequence numbers for recoverability purposes on process start (see overview for more information on recoverability).
Default value is false
.
Example usage below:
streamerClients {
streamerClient(clientName = "ORDER_OUT") {
isReplayable = true
dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT")
onMessage {
send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT")
}
}
}
eventHandlerBuffer
eventHandlerBuffer
is a setting to specify how many unacknowledged messages can be "in-flight" before the streamer client stops sending. If the event handler fails to respond after this number of messages is reached, the streamer client stops sending messages.
The default value is 50.
Example usage below:
streamerClients {
streamerClient(clientName = "ORDER_OUT") {
isReplayable = 50
dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT")
onMessage {
send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT")
}
}
}
sentWarningRange
sentWarningRange
is a setting to specify a time range in seconds to wait before marking the streamer client as WARNING or ERROR once eventHandlerBuffer
has been reached.
It compares the last time a message was sent to the event handler against the current time to understand the current elapsed time:
- If there are no unacknowledged messages, mark process as HEALTHY.
- If elapsed time is less than lower part of the range, mark process as HEALTHY.
- If elapsed time is between the lower part of then range and the upper part of the range, mark process as WARNING.
- If elapsed time is above the upper part of the range, mark process as ERROR.
The default value is 5 seconds for the lower part of the range, and 60 seconds for the upper part of the range.
Example usage below:
streamerClients {
streamerClient(clientName = "ORDER_OUT") {
sentWarningRange = 5L..60L
dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT")
onMessage {
send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT")
}
}
}
receiveWarningRange
receiveWarningRange
is similar to the previous setting, but it applies even when eventHandlerBuffer
max size has not been reached. This can capture odd scenarios in which only a handful of messages are never acknowledged, but otherwise the data stream seems to work as expected.
It compares the last time a message was received from the event handler against the current time to understand the current elapsed time:
- First the checks in
sentWarningRange
apply. Additionally, if the elapsed time for the last time a message was sent is less than the lower part of thesentWarningRange
the process is always marked as healthy. - If the previous point doesn't apply, the same logic used in
sentWarningRange
applies here:- If elapsed time is less than lower part of the range, mark process as HEALTHY.
- If elapsed time is between the lower part of then range and the upper part of the range, mark process as WARNING.
- If elapsed time is above the upper part of the range, mark process as ERROR.
The default value is 5 seconds for the lower part of the range, and 60 seconds for the upper part of the range.
Example usage below:
streamerClients {
streamerClient(clientName = "ORDER_OUT") {
receiveWarningRange = 5L..60L
dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT")
onMessage {
send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT")
}
}
}
updateFrequency
updateFrequency
is the time interval in milliseconds used to perform the checks for receiveWarningRange
and sentWarningRange
.
The default value is 2000 milliseconds.
Example usage below:
streamerClients {
streamerClient(clientName = "ORDER_OUT") {
updateFrequency = 5L..60L
dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT")
onMessage {
send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT")
}
}
}
cacheEntries
This setting must be used with extreme caution. If the default value is changed, the corresponding cache file (found inside $GENESIS_HOME/runtime/cache) for the relevant streamer client definition needs to be deleted before the streamer client process is restarted. This setting is marked as deprecated in the code to highlight the importance of this fact.
cacheEntries
sets how many entries will be allocated in the streamer client cache file.
Each streamer client subscription creates a ChronicleMap
instance with capacity for 2000 entries by default.
This is used as the cache for events that have been received from the streamer and sent to the event handler.
The map uses the unique numeric sequence value provided by the streamer as a key, and then a status (SENT
or COMPLETE
) as the value.
Once an event has been sent to the event handler, we mark the sequence reference as SENT
. When the streamer client receives a response, it is marked as COMPLETE
.
Every two seconds, a job runs to update the PROCESS_REF database table with the data stored in the ChronicleMap
instance (if there is any). The value stored in the database is either:
- the oldest
SENT
value (if available) or - if no
SENT
values are stored, the latestCOMPLETE
value
The idea is that whenever it is necessary, we can always replay events from the streamer, starting from the oldest SENT
(there was no response for that event, and we can't guarantee it was processed successfully). If no SENT
values are available, then it is safe to replay from the latest COMPLETE
event. As part of this job, we also remove entries from the ChronicleMap that have been marked as COMPLETE
.
If the ChronicleMap
file is not present, the database value will be used.
ChronicleMap
instances need to be pre-allocated with a fixed set of parameters. If the streamer client tries to store more than 2000 entries before the cleaning/flushing job runs (i.e. within two seconds), it exceeds the maximum capacity. This is a possibility when the stream is overloaded. In this scenario, cacheEntries
should be changed to accommodate higher volumes.
Example usage below:
streamerClients {
streamerClient(clientName = "ORDER_OUT") {
cacheEntries = 2000
dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT")
onMessage {
send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT")
}
}
}
Custom Log messages
Genesis provides a class called LOG
that can be used to insert custom log messages. This class provides you with
5 methods to be used accordingly: info
, warn
, error
,trade
,debug
. To use these methods, you need to provide, as an input,
a string that will be inserted into the Logs.
Here is an example where you can insert an info message into the log file:
LOG.info("This is an info message")
The LOG
instance is a Logger from SLF4J.
In order to see these messages, you must set the logging level accordingly. You can do this using the logLevel command.
Metrics
Ensure you have enabled metrics in your environment to view them.
The metrics for the Streamer client measure how long it takes for each streamer client to be processed by the target event handlers, as well as the total number of unacknowledged messages.
These metrics allow monitoring tools to understand if some specific event handlers are overwhelmed, as well as how quickly they can deal with each streamer client message.
If the unacknowledged message size stays at a high number, the possible causes are:
- The event handler is unable to keep up with the message flow.
- The event handler is not sending an acknowledgement in response to the messages for any reason.
Metric | Explanation |
---|---|
unreplied_messages_size | The number of outstanding messages for each streamer client definition |
outbound_processing_latency | The event handler processing latency for each streamer client definition |
Runtime configuration
To include your *-streamer-client.kts
file definitions in a runtime process, you will need to ensure the process definition:
- Ensure
genesis-pal-streamerclient
is included inmodule
- Ensure
global.genesis.streamerclient.pal
is included inpackage
- Ensure your streamer-client.kts file(s) are defined in
script
- Ensure
pal
is set inlanguage
If you wish to run a dedicated process for a streamer, the following gives an example full process definition:
<process name="POSITION_STREAMER">
<groupId>POSITION</groupId>
<description>Streams trades to external FIX gateway</description>
<start>true</start>
<options>-Xmx256m -DRedirectStreamsToLog=true -DXSD_VALIDATE=false -XX:MaxHeapFreeRatio=70 -XX:MinHeapFreeRatio=30 -XX:+UseG1GC -XX:+UseStringDeduplication -XX:OnOutOfMemoryError="handleOutOfMemoryError.sh %p"</options>
<module>genesis-pal-streamerclient</module>
<package>global.genesis.streamerclient.pal</package>
<primaryOnly>true</primaryOnly>
<script>position-streamer-client.kts</script>
<loggingLevel>INFO,DATADUMP_ON</loggingLevel>
<language>pal</language>
</process>
See runtime configuration - processes for further details.
Performance
If better database response performance is required, you may also consider added a cache to the process. See runtime configuration - process cache for more details.