Reconciliation - Integrating with Data Pipelines
The RECONCILIATION_MANAGER process is able to integrate directly with the platform's Data Pipelines component, in order to consume data and reconcile data from anywhere, both internal and external to the application.
The manager will look for a specific script file at runtime, reconciliation-pipelines.kts
. You can override the default (empty) file in your application by adding a version to your src/main/genesis/scripts
directory in your project.
In order to make the reconciliation pipeline elements available for use in the IDE, you will also need to add the following dependency to your main application module.
implementation("global.genesis:reconciliation-app:${properties["reconciliationVersion"]}")
Once this is done you are ready to integrate Pipelines into the Reconciliation Manager.
Elements
Once you have added the component and the relevant dependencies, you will find that you have additional elements as part of the data pipelines ecosystem:
reconSink
In order to send data to the recon engine, the pipeline must be terminated with the recon sink component. The input type expected by the recon sink is NormalisedRecord
which takes a single constructor argument, which is a Map with String keys and Object values. The engine expects this map to conform to the data dictionary defined for the specific reconciliation.
The java/kotlin object types expected for the data dictionary field types are as follows:
Data Dictionary Type | Java Type | Kotlin Type |
---|---|---|
STRING | java.lang.String | kotlin.String |
INT | java.lang.Integer | kotlin.Int |
LONG | java.lang.Long | kotlin.Long |
DOUBLE | java.lang.Double | kotlin.Double |
RAW | Array[java.lang.Byte] | kotlin.ByteArray |
BOOLEAN | java.lang.Boolean | kotlin.Boolean |
BIGDECIMAL | java.math.BigDecimal | java.math.BigDecimal |
DATE | org.joda.LocalDate | org.joda.LocalDate |
BIGDECIMAL | org.joda.DateTime | org.joda.DateTime |
NANO_TIMESTAMP | java.lang.Long | kotlin.Long |
Here is an example of a simple pipeline integration reading data from the Genesis Application database:
pipeline("DB_TRADE_TO_RECON") {
source(dbBulkQuery<Trade>())
.map(tableEntityNormaliser())
.sink(reconSink())
}
tableEntityNormaliser
The table entity normaliser is a convenience operator to convert a table entity read from the Genesis Application Datatbase to the normalised format required by the Reconciliation Manager:
pipeline("DB_TRADE_TO_RECON") {
source(dbBulkQuery<Trade>())
.map(tableEntityNormaliser())
.sink(reconSink())
}
viewEntityNormaliser
The view entity normaliser is a convenience operator to convert a view entity read from the Genesis Application Datatbase to the normalised format required by the Reconciliation Manager:
pipeline("DB_TRADE_TO_RECON") {
source(dbBulkQuery<TradeView>())
.map(viewEntityNormaliser())
.sink(reconSink())
}
Using Pipelines in a Reconciliation
Once the pipelines are integrated using the reconciliation-pipelines.kts
file and the reconSink, they will be available for selection from the configuration screen in the UI.