ALM app: ingesting external data
There are several different ways of loading data into a Genesis application.
In this page, we’ll go through some different methods to ingest data in to the database.
Loading example data
First, let's look at loading some example static data into our application.
There is a simple command - Genesis: SendIt
- that loads data to the database.
To start with, we shall load three static data files: BOOK
, CLIENT
, and ENTITY
.
You can find these three files in the SampleData
folder of the ALM repo.
Once you have copied the content, press SHIFT
and select BOOK.csv
, CLIENT.csv
and ENTITY.csv
, then right-click and select Genesis: SendIt
to add the data.
By default, SendIt
simply adds data. Note that:
- it will not clear any existing records
- it will not overwrite any record that is already in the database
However, there is a lot more you can do with this command. See the SendIt specification for full details
Configuring a data pipeline
The Genesis Create set-up includes a data pipeline. You can now try to use this to input some data.
What the pipeline definitions
The code for the pipeline is in ALM/server/ALM-app/src/main/genesis/scripts/ALM-pipelines.kts
.
The pipeline has been set up so that when the application is running, a folder called loadData
is created in ALM/server/ALM-app/src/main/genesis/data
.
-
The pipeline listens to this folder for any CSV file with the prefix
CDs
. -
When it sees a file, it processes it and adds any records to the
CD_TRADE
table. -
Once processed, the files move into a sub-directory called
.done
, with a timestamp added to the filename.
Try it out
-
Download the file
CDs.csv
from theSampleData/data_pipeline_example_file
folder of the ALM repo. -
Paste this file into the
loadData
folder in your app.
That's it! The data pipeline will then process the file automatically.
Loading data via Rest API
For this exercise, we have set up a rest server that provides Loan Trade data.
Define the receiving message
To call the rest server, we first need to define the receiving message.
-
Create a new file called
Messages.kt
atALM\server\ALM-app\src\main\kotlin
. -
Paste the following into the file:
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import com.fasterxml.jackson.annotation.JsonProperty
import global.genesis.gen.dao.LoanTrade
import org.joda.time.DateTime
import org.joda.time.DateTimeZone
@JsonIgnoreProperties(ignoreUnknown = true)
data class LoanMessage(
@JsonProperty("LOAN_ID")
val loanId: String,
@JsonProperty("CLIENT_NAME")
val clientName: String,
@JsonProperty("FACILITY_NAME")
val facilityName: String,
@JsonProperty("FACILITY_AMOUNT")
val facilityAmount: Double,
@JsonProperty("FACILITY_CCY")
val facilityCurrency: String,
@JsonProperty("DRAWDOWN_DATE")
val drawdownDate: Long,
@JsonProperty("DRAWDOWN_AMOUNT")
val drawdownAmount: Double,
@JsonProperty("DRAWDOWN_CURRENCY")
val drawdownCurrency: String,
@JsonProperty("PAYMENT_DATE")
val paymentDate: Long,
@JsonProperty("PAYMENT_CURRENCY")
val paymentCurrency: String,
@JsonProperty("PAYMENT_AMOUNT")
val paymentAmount: Double
)
data class AllLoansResponse(
@JsonProperty("ROWS_COUNT")
val rowsCount: Int,
@JsonProperty("MESSAGE_TYPE")
val messageType: String = "EVENT_LOGIN_AUTH",
@JsonProperty("ROW")
val row: List<LoanMessage>,
@JsonProperty("MORE_ROWS")
val moreRows: Boolean,
@JsonProperty("SOURCE_REF")
val sourceRef: String,
@JsonProperty("SEQUENCE_ID")
val sequenceId: Int
)
data class DataLogonDetails(
@JsonProperty("MAX_ROWS")
val maxRows: Int,
@JsonProperty("MAX_VIEW")
val maxView: Int,
)
data class DataLogonRequest(
@JsonProperty("DETAILS")
val details: DataLogonDetails
)
@JsonIgnoreProperties(ignoreUnknown = true)
data class LoginResponse(
@JsonProperty("SESSION_AUTH_TOKEN")
val sessionAuthToken: String,
)
data class Details(
@JsonProperty("USER_NAME")
val username: String = "JaneDee",
@JsonProperty("PASSWORD")
val password: String = "beONneON*74"
)
data class LoginRequest(
@JsonProperty("SOURCE_REF")
val sourceRef: String = "login-1",
@JsonProperty("MESSAGE_TYPE")
val messageType: String = "EVENT_LOGIN_AUTH",
@JsonProperty("DETAILS")
val details: Details = Details()
)
fun LoanMessage.asLoanTrade() = LoanTrade(
clientName = clientName,
drawdownAmount = drawdownAmount,
drawdownCurrency = drawdownCurrency,
drawdownDate = unixTimestampToDateTime(drawdownDate),
facilityAmount = facilityAmount,
facilityCurrency = facilityCurrency,
facilityName = facilityName,
loanId = loanId,
paymentAmount = paymentAmount,
paymentCurrency = paymentCurrency,
paymentDate = unixTimestampToDateTime(paymentDate)
)
private fun unixTimestampToDateTime(timestamp: Long) =
DateTime(timestamp * 1000L, DateTimeZone.UTC)
data object Message
Add an event trigger
Now we can add an event to trigger a request to the REST API.
- Open the file
ALM\server\ALM-app\src\main\genesis\scripts\ALM-eventhandler.kts
and add the following imports to the very top of the file:
import global.genesis.gen.dao.enums.ALM.fx_trade.TradeStatus
import global.genesis.httpclient.GenesisHttpClient
import global.genesis.httpclient.request.HttpMethod
import global.genesis.httpclient.request.HttpRequest
import global.genesis.message.core.HttpStatusCode
import io.ktor.client.HttpClient
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
import io.ktor.http.ContentType
import io.ktor.serialization.jackson.jackson
import io.ktor.util.reflect.TypeInfo
- Add the following event to the end of the the
eventHandler{}
:
eventHandler<Message>("LOAN_TRADE_REST_API", transactional = true) {
// Instantiate the client outside the onCommit to avoid repeating
val baseUrl = "https://playground.demo.genesis.global/gwf"
val authUrl = "$baseUrl/event-login-auth"
val loansUrl = "$baseUrl/ALL_LOAN"
val ktorClient = HttpClient {
install(ContentNegotiation) {
jackson(ContentType.Application.Json)
}
}
val client = GenesisHttpClient(ktorClient)
onCommit {
// Login to the external application
val response = client.submitRequest<LoginRequest, LoginResponse>(
HttpRequest(
url = authUrl,
method = HttpMethod.POST,
body = LoginRequest(),
headers = mapOf("Content-Type" to "application/json")
),
responseTypeInfo = TypeInfo(type = LoginResponse::class, reifiedType = LoginResponse::class.java),
)
val statusCode = response.statusCode
if (statusCode != HttpStatusCode.Ok)
return@onCommit nack("Received $statusCode from $authUrl")
val responseAuthToken = response.data.sessionAuthToken
LOG.info("Retrieved session auth token")
// Send a data logon to the external dataserver and retrieve the current loan data
val loanMessages = client.dataLogon(loansUrl, responseAuthToken).data.row
// Map and insert the retrieved loan messages
loanMessages.map { loanMessage ->
loanMessage.asLoanTrade()
}.forEach { loanTrade ->
LOG.info("Loan Trade: {}", loanTrade)
entityDb.upsert(loanTrade)
}
// Close the dataserver subscription (important!)
val closeResponse = client.delete<String> {
url = loansUrl
headers(
"Content-Type" to "application/json",
"SESSION_AUTH_TOKEN" to responseAuthToken,
"SOURCE_REF" to "data-logon-1", // must match source ref of data logon
)
}
LOG.info("Received status {} from close subscription request", closeResponse.statusCode)
ack()
}
}
The use of upsert in the code ensures that any loan trades are updated, being inserted if not present. They can be deleted at the front end and will reappear each time the cron job runs.
- After the end of the
eventHandler{}
section, add the following function, which is required to get the authentication token:
suspend fun GenesisHttpClient.dataLogon(url: String, authToken: String) = submitRequest<DataLogonRequest, AllLoansResponse>(
HttpRequest(
url = url,
method = HttpMethod.POST,
body = DataLogonRequest(
// Max rows will determine the number of records retrieved
DataLogonDetails(
maxRows = 10,
maxView = 100
)
),
headers = mapOf(
"Content-Type" to "application/json",
"SESSION_AUTH_TOKEN" to authToken,
"SOURCE_REF" to "data-logon-1",
"USER_NAME" to "JaneDee",
)
),
responseTypeInfo = TypeInfo(type = AllLoansResponse::class, reifiedType = AllLoansResponse::class.java),
)
Setting up an evaluator
Now that we've added these events, we shall set up some ways to call this.
We can do this by adding and defining an EVALUATOR process.
- Open
ALM\server\ALM-app\src\main\genesis\cfg\ALM-processes.xml
and insert the following code:
<process name="ALM_EVALUATOR">
<start>true</start>
<groupId>MYAPP</groupId>
<options>-Xmx512m -DXSD_VALIDATE=false</options>
<module>genesis-evaluator</module>
<primaryOnly>true</primaryOnly>
<package>global.genesis.eventhandler,global.genesis.evaluator</package>
<description>Dynamic/time rules engine</description>
<loggingLevel>DEBUG,DATADUMP_ON</loggingLevel>
</process>
- Open
ALM\server\ALM-app\src\main\genesis\cfg\ALM-service-definitions.xml
and add:
<service host="localhost" name="ALM_EVALUATOR" port="9703"/>
This gives us a new service that the Cron Scheduler can call to trigger the rest api call as needed.
To define the Cron Scheduler:
-
In the folder
ALM\server\ALM-app\src\main\genesis\data
, create a new csv file calledCRON_RULE.csv
. -
Insert the following line:
CRON_EXPRESSION,DESCRIPTION,TIME_ZONE,RULE_STATUS,NAME,USER_NAME,PROCESS_NAME,MESSAGE_TYPE
"0 0/10 * * * ?","Loan Import Rule","Europe/London","ENABLED","Loan Rule","admin","ALM_COMPACT_PROCESS","EVENT_LOAN_TRADE_REST_API"
By default, this example will run every 10 minutes. But you are welcome to set the cron schedule as needed.
- Right click on the file in the Project window and select
Genesis: SendIt
to insert this record into the database.
Kafka integration
You can also set up a connection to a Kafka source, to consume messages from it.
For this exercise, we have set up a Kafka producer which we can connect to for some FX_RATE
data.
Add implementations and properties
To start, we'll need to add some implementations to our application.
- Open the file
server/ALM-app/build.gradle.kts
, then add the following implementations:
implementation("global.genesis:kafka-genesis:${properties["platformIntegrationVersion"]}")
implementation("software.amazon.msk:aws-msk-iam-auth:2.2.0")
- Then in the file
server/gradle.properties
, add the following property:
platformIntegrationVersion=8.4.0
- Also add this to the classpath of the ALM_PIPELINE process. Open the file
server/ALM-app/src/main/genesis/cfg/ALM-processes.xml
, then add the following to the ALM_PIPLINE process:
<classpath>ALM-app*,aws-msk-iam-auth*</classpath>
Set up source properties
We need to set up some properties for our Kafka source.
-
Open the file
server/ALM-app/src/main/genesis/cfg/ALM-system-definition.kts
. -
At the top of the file, add the following import:
import java.net.InetAddress;
- Go to the global section and add the following:
// the following have been set up as global definitions to be used as part of the pipelines script, these can be host specific as well
item("BOOTSTRAP_SERVER", "boot-qjjhmpj3.c2.kafka-serverless.eu-west-2.amazonaws.com:9098")
item("CONSUMER_GROUP_ID", InetAddress.getLocalHost().getHostName())
// see topic creation comment in docker-compose.yml
item("KAFKA_TOPIC", "fx-rate")
// for running kafka locally we have set all security configurations to be PLAINTEXT, ensure that you use the appropriate security config for your application
item("KAFKA_SECURITY_CONFIG", "SASL_SSL")
Creating a new pipeline for Kafka
-
Open the file
server/ALM-app/src/main/genesis/scripts/ALM-pipelines.kts
. -
We need to add some imports, and initialise some methods. Add the code below to the beginning of the file:
import global.genesis.pipeline.file.CsvRow
import global.genesis.CdTradeLoadCdTradeCsvDataMapper
import global.genesis.pipeline.api.db.DbOperation
import kotlinx.coroutines.flow.flow
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.common.serialization.StringDeserializer
val source = kafkaSource<String, String> {
// now you can define the kafka specific configurations, these can either be hardcoded into this script or configurable via system definitions as shown below
bootstrapServers = systemDefinition.getItem("BOOTSTRAP_SERVER").toString()
groupId = systemDefinition.getItem("CONSUMER_GROUP_ID").toString()
// here we are using the out of box deserialisers for simplicity but as mentioned in the README you can create your own as per your requirement
// you can provide any deserialiser as per your requirement as long as it is of type Deserializer<T> where T is the type of your key / value respectively specified when initialising kafkaSource above
keyDeserializer = StringDeserializer()
// here you can use PriceReceivedDeserialiser() for the value and the ConsumerRecord returned would have key String, value PriceReceived
valueDeserializer = StringDeserializer()
// ensure that this kafka topic has been created, in this example we do so when initialising the docker container
topic = systemDefinition.getItem("KAFKA_TOPIC").toString()
securityProtocol = systemDefinition.getItem("KAFKA_SECURITY_CONFIG").toString()
// if you would like to provide any additional kafka consumer config you can do this like so
additionalConfig = mapOf(
"sasl.mechanism" to "AWS_MSK_IAM",
"sasl.jaas.config" to "software.amazon.msk.auth.iam.IAMLoginModule required awsDebugCreds=false awsMaxRetries=\"10\" awsMaxBackOffTimeMs=\"500\";",
"sasl.client.callback.handler.class" to "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
)
}
// initialise the operator that converts the output of the kafka source (ConsumerRecords<String, Int> and returns a flow of each ConsumerRecord object in that batch
val splitOperator: SplitOperator<ConsumerRecords<String, String>, ConsumerRecord<String, String>> = SplitOperator { consumerRecords ->
flow {
// here we iterate over every ConsumerRecord
consumerRecords.forEach {
// here we emit it to the resulting flow
emit(it)
}
}
}
- You can add a new pipeline itself in the
pipelines{}
section. Use the code below:
pipeline("KAFKA_TO_DB_PIPELINE") {
// sourcing from the kafka source as defined above
source(source)
// split operator to split up batch of ConsumerRecords as defined above
.split(splitOperator).split { input ->
flow<FxRate>{
val values = input.value().split("\",\"")
// processing to seperate each Fx Rate.
for (fxRates in values) {
val fxRate = fxRates.split(",")
val rate = fxRate[0].substring(fxRate[0].indexOf("\"") + 1).trim()
val targetCurrency = fxRate[1].substringBefore("\\")
val sourceCurrency = fxRate[2].substringBefore("\\").substringBefore("\"")
emit(FxRate(rate.toDouble(), targetCurrency, sourceCurrency))
}
}
}
.map {
// in order to use the database sink we must provide it a DbOperation to perform - in this case it's an upsert on each FxRate object provided by the above operation
DbOperation.Upsert(it)
}
// here we are using a simple database sink to perform the above operation - txDbSink is a transactional database sink
.sink(txDbSink())
}
You can view a final version of the code for the ALM app, including all the modifications outlined in this guide, in the ALM app repository.