Skip to main content

Reconciliation - Integrating with Data Pipelines

The RECONCILIATION_MANAGER process is able to integrate directly with the platform's Data Pipelines component, in order to consume data and reconcile data from anywhere, both internal and external to the application.

The manager will look for a specific script file at runtime, reconciliation-pipelines.kts. You can override the default (empty) file in your application by adding a version to your src/main/genesis/scripts directory in your project.

In order to make the reconciliation pipeline elements available for use in the IDE, you will also need to add the following dependency to your main application module.

implementation("global.genesis:reconciliation-app:${properties["reconciliationVersion"]}")

Once this is done you are ready to integrate Pipelines into the Reconciliation Manager.

Elements

Once you have added the component and the relevant dependencies, you will find that you have additional elements as part of the data pipelines ecosystem:

reconSink

In order to send data to the recon engine, the pipeline must be terminated with the recon sink component. The input type expected by the recon sink is NormalisedRecord which takes a single constructor argument, which is a Map with String keys and Object values. The engine expects this map to conform to the data dictionary defined for the specific reconciliation.

The java/kotlin object types expected for the data dictionary field types are as follows:

Data Dictionary TypeJava TypeKotlin Type
STRINGjava.lang.Stringkotlin.String
INTjava.lang.Integerkotlin.Int
LONGjava.lang.Longkotlin.Long
DOUBLEjava.lang.Doublekotlin.Double
RAWArray[java.lang.Byte]kotlin.ByteArray
BOOLEANjava.lang.Booleankotlin.Boolean
BIGDECIMALjava.math.BigDecimaljava.math.BigDecimal
DATEorg.joda.LocalDateorg.joda.LocalDate
BIGDECIMALorg.joda.DateTimeorg.joda.DateTime
NANO_TIMESTAMPjava.lang.Longkotlin.Long

Here is an example of a simple pipeline integration reading data from the Genesis Application database:

pipeline("DB_TRADE_TO_RECON") {
source(dbBulkQuery<Trade>())
.map(tableEntityNormaliser())
.sink(reconSink())
}

tableEntityNormaliser

The table entity normaliser is a convenience operator to convert a table entity read from the Genesis Application Datatbase to the normalised format required by the Reconciliation Manager:

pipeline("DB_TRADE_TO_RECON") {
source(dbBulkQuery<Trade>())
.map(tableEntityNormaliser())
.sink(reconSink())
}

viewEntityNormaliser

The view entity normaliser is a convenience operator to convert a view entity read from the Genesis Application Datatbase to the normalised format required by the Reconciliation Manager:

pipeline("DB_TRADE_TO_RECON") {
source(dbBulkQuery<TradeView>())
.map(viewEntityNormaliser())
.sink(reconSink())
}

Using Pipelines in a Reconciliation

Once the pipelines are integrated using the reconciliation-pipelines.kts file and the reconSink, they will be available for selection from the configuration screen in the UI.