Skip to main content

FIX - Data Pipelines

The FIX Component provides some Data Pipeline implementations, which you can use within any data pipeline that you have set up.

Set-up

To have access to the GPAL extension functions described in this section, you must add the following dependency to the app module of your project:

implementation("global.genesis:fix-pipelines:$fixVersion")

fixSource

In order to source FIX messages for your pipeline, you must include the following packages in the process definition running your pipeline script file:

global.genesis.listener
global.genesis.pipeline

The fixSource must be instantiated with any number of session ids that you want to connect to. It returns an object of type InboundFixMessage, which contains both the Message object and session id String that identify where the message has come from.

pipelines {
pipeline("TEST_FIX_SOURCE") {
source(fixSource("FIX.4.0:CPTY1->CPTY2", "FIX.4.0:CPTY3->CPTY4"))
.filter { it.message.header.getString(35) == "D" }
.sink(TestFixSink())
}
}

fixSink

In order to send fix messages to a session from your pipeline, you must include the following packages in the process definition running your pipeline script file:

global.genesis.publisher
global.genesis.pipeline

The fixSink needs to receive an object or flow of type OutboundFixMessage, which contains the Message object and an optional list of session ids to send this message to.

If no list or an empty list is provided, the message will be sent to the default session id that is configured in your quick fix settings.

pipelines {
pipeline("TEST_FIX_SINK") {
source(Source {
flowOf(OutboundFixMessage(Message(...), listOf("FIX.4.0:CPTY1->CPTY2", "FIX.4.0:CPTY3->CPTY4")), OutboundFixMessage(Message(...)))
})
.sink(fixSink())
}
}