Skip to main content

ALM app: ingesting external data

There are several different ways of loading data into a Genesis application.

In this page, we’ll go through some different methods to ingest data in to the database.

Loading example data

First, let's look at loading some example static data into our application.

There is a simple command - Genesis: SendIt - that loads data to the database.

To start with, we shall load three static data files: BOOK, CLIENT, and ENTITY.

You can find these three files in the SampleData folder of the ALM repo.

Once you have copied the content, press SHIFT and select BOOK.csv, CLIENT.csv and ENTITY.csv, then right-click and select Genesis: SendIt to add the data.

tip

By default, SendIt simply adds data. Note that:

  • it will not clear any existing records
  • it will not overwrite any record that is already in the database

However, there is a lot more you can do with this command. See the SendIt specification for full details

Configuring a data pipeline

The Genesis Create set-up includes a data pipeline. You can now try to use this to input some data.

What the pipeline definitions

The code for the pipeline is in ALM/server/ALM-app/src/main/genesis/scripts/ALM-pipelines.kts.

The pipeline has been set up so that when the application is running, a folder called loadData is created in ALM/server/ALM-app/src/main/genesis/data.

  1. The pipeline listens to this folder for any CSV file with the prefix CDs.

  2. When it sees a file, it processes it and adds any records to the CD_TRADE table.

  3. Once processed, the files move into a sub-directory called .done, with a timestamp added to the filename.

Try it out

  1. Download the file CDs.csv from the SampleData/data_pipeline_example_file folder of the ALM repo.

  2. Paste this file into the loadData folder in your app.

That's it! The data pipeline will then process the file automatically.

Loading data via Rest API

For this exercise, we have set up a rest server that provides Loan Trade data.

Define the receiving message

To call the rest server, we first need to define the receiving message.

  1. Create a new file called Messages.kt at ALM\server\ALM-app\src\main\kotlin.

  2. Paste the following into the file:

import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import com.fasterxml.jackson.annotation.JsonProperty
import global.genesis.gen.dao.LoanTrade
import org.joda.time.DateTime
import org.joda.time.DateTimeZone


@JsonIgnoreProperties(ignoreUnknown = true)
data class LoanMessage(
@JsonProperty("LOAN_ID")
val loanId: String,
@JsonProperty("CLIENT_NAME")
val clientName: String,
@JsonProperty("FACILITY_NAME")
val facilityName: String,
@JsonProperty("FACILITY_AMOUNT")
val facilityAmount: Double,
@JsonProperty("FACILITY_CCY")
val facilityCurrency: String,
@JsonProperty("DRAWDOWN_DATE")
val drawdownDate: Long,
@JsonProperty("DRAWDOWN_AMOUNT")
val drawdownAmount: Double,
@JsonProperty("DRAWDOWN_CURRENCY")
val drawdownCurrency: String,
@JsonProperty("PAYMENT_DATE")
val paymentDate: Long,
@JsonProperty("PAYMENT_CURRENCY")
val paymentCurrency: String,
@JsonProperty("PAYMENT_AMOUNT")
val paymentAmount: Double
)

data class AllLoansResponse(
@JsonProperty("ROWS_COUNT")
val rowsCount: Int,
@JsonProperty("MESSAGE_TYPE")
val messageType: String = "EVENT_LOGIN_AUTH",
@JsonProperty("ROW")
val row: List<LoanMessage>,
@JsonProperty("MORE_ROWS")
val moreRows: Boolean,
@JsonProperty("SOURCE_REF")
val sourceRef: String,
@JsonProperty("SEQUENCE_ID")
val sequenceId: Int
)

data class DataLogonDetails(
@JsonProperty("MAX_ROWS")
val maxRows: Int,
@JsonProperty("MAX_VIEW")
val maxView: Int,
)

data class DataLogonRequest(
@JsonProperty("DETAILS")
val details: DataLogonDetails
)

@JsonIgnoreProperties(ignoreUnknown = true)
data class LoginResponse(
@JsonProperty("SESSION_AUTH_TOKEN")
val sessionAuthToken: String,
)

data class Details(
@JsonProperty("USER_NAME")
val username: String = "JaneDee",
@JsonProperty("PASSWORD")
val password: String = "beONneON*74"
)

data class LoginRequest(
@JsonProperty("SOURCE_REF")
val sourceRef: String = "login-1",
@JsonProperty("MESSAGE_TYPE")
val messageType: String = "EVENT_LOGIN_AUTH",
@JsonProperty("DETAILS")
val details: Details = Details()
)

fun LoanMessage.asLoanTrade() = LoanTrade(
clientName = clientName,
drawdownAmount = drawdownAmount,
drawdownCurrency = drawdownCurrency,
drawdownDate = unixTimestampToDateTime(drawdownDate),
facilityAmount = facilityAmount,
facilityCurrency = facilityCurrency,
facilityName = facilityName,
loanId = loanId,
paymentAmount = paymentAmount,
paymentCurrency = paymentCurrency,
paymentDate = unixTimestampToDateTime(paymentDate)
)

private fun unixTimestampToDateTime(timestamp: Long) =
DateTime(timestamp * 1000L, DateTimeZone.UTC)

data object Message

Add an event trigger

Now we can add an event to trigger a request to the REST API.

  1. Open the file ALM\server\ALM-app\src\main\genesis\scripts\ALM-eventhandler.kts and add the following imports to the very top of the file:
import global.genesis.gen.dao.enums.ALM.fx_trade.TradeStatus
import global.genesis.httpclient.GenesisHttpClient
import global.genesis.httpclient.request.HttpMethod
import global.genesis.httpclient.request.HttpRequest
import global.genesis.message.core.HttpStatusCode
import io.ktor.client.HttpClient
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
import io.ktor.http.ContentType
import io.ktor.serialization.jackson.jackson
import io.ktor.util.reflect.TypeInfo
  1. Add the following event to the end of the the eventHandler{}:
    eventHandler<Message>("LOAN_TRADE_REST_API", transactional = true) {
// Instantiate the client outside the onCommit to avoid repeating
val baseUrl = "https://playground.demo.genesis.global/gwf"
val authUrl = "$baseUrl/event-login-auth"
val loansUrl = "$baseUrl/ALL_LOAN"

val ktorClient = HttpClient {
install(ContentNegotiation) {
jackson(ContentType.Application.Json)
}
}
val client = GenesisHttpClient(ktorClient)

onCommit {
// Login to the external application
val response = client.submitRequest<LoginRequest, LoginResponse>(
HttpRequest(
url = authUrl,
method = HttpMethod.POST,
body = LoginRequest(),
headers = mapOf("Content-Type" to "application/json")
),
responseTypeInfo = TypeInfo(type = LoginResponse::class, reifiedType = LoginResponse::class.java),
)

val statusCode = response.statusCode
if (statusCode != HttpStatusCode.Ok)
return@onCommit nack("Received $statusCode from $authUrl")

val responseAuthToken = response.data.sessionAuthToken
LOG.info("Retrieved session auth token")

// Send a data logon to the external dataserver and retrieve the current loan data
val loanMessages = client.dataLogon(loansUrl, responseAuthToken).data.row

// Map and insert the retrieved loan messages
loanMessages.map { loanMessage ->
loanMessage.asLoanTrade()
}.forEach { loanTrade ->
LOG.info("Loan Trade: {}", loanTrade)
entityDb.upsert(loanTrade)
}

// Close the dataserver subscription (important!)
val closeResponse = client.delete<String> {
url = loansUrl
headers(
"Content-Type" to "application/json",
"SESSION_AUTH_TOKEN" to responseAuthToken,
"SOURCE_REF" to "data-logon-1", // must match source ref of data logon
)
}

LOG.info("Received status {} from close subscription request", closeResponse.statusCode)

ack()
}

}

The use of upsert in the code ensures that any loan trades are updated, being inserted if not present. They can be deleted at the front end and will reappear each time the cron job runs.

  1. After the end of the eventHandler{} section, add the following function, which is required to get the authentication token:
suspend fun GenesisHttpClient.dataLogon(url: String, authToken: String) = submitRequest<DataLogonRequest, AllLoansResponse>(
HttpRequest(
url = url,
method = HttpMethod.POST,
body = DataLogonRequest(
// Max rows will determine the number of records retrieved
DataLogonDetails(
maxRows = 10,
maxView = 100
)
),
headers = mapOf(
"Content-Type" to "application/json",
"SESSION_AUTH_TOKEN" to authToken,
"SOURCE_REF" to "data-logon-1",
"USER_NAME" to "JaneDee",
)
),
responseTypeInfo = TypeInfo(type = AllLoansResponse::class, reifiedType = AllLoansResponse::class.java),
)

Setting up an evaluator

Now that we've added these events, we shall set up some ways to call this.

We can do this by adding and defining an EVALUATOR process.

  1. Open ALM\server\ALM-app\src\main\genesis\cfg\ALM-processes.xml and insert the following code:
  <process name="ALM_EVALUATOR">
<start>true</start>
<groupId>MYAPP</groupId>
<options>-Xmx512m -DXSD_VALIDATE=false</options>
<module>genesis-evaluator</module>
<primaryOnly>true</primaryOnly>
<package>global.genesis.eventhandler,global.genesis.evaluator</package>
<description>Dynamic/time rules engine</description>
<loggingLevel>DEBUG,DATADUMP_ON</loggingLevel>
</process>
  1. Open ALM\server\ALM-app\src\main\genesis\cfg\ALM-service-definitions.xml and add:
  <service host="localhost" name="ALM_EVALUATOR" port="9703"/>

This gives us a new service that the Cron Scheduler can call to trigger the rest api call as needed.

To define the Cron Scheduler:

  1. In the folder ALM\server\ALM-app\src\main\genesis\data, create a new csv file called CRON_RULE.csv.

  2. Insert the following line:

CRON_EXPRESSION,DESCRIPTION,TIME_ZONE,RULE_STATUS,NAME,USER_NAME,PROCESS_NAME,MESSAGE_TYPE
"0 0/10 * * * ?","Loan Import Rule","Europe/London","ENABLED","Loan Rule","admin","ALM_COMPACT_PROCESS","EVENT_LOAN_TRADE_REST_API"

By default, this example will run every 10 minutes. But you are welcome to set the cron schedule as needed.

  1. Right click on the file in the Project window and select Genesis: SendIt to insert this record into the database.

Kafka integration

You can also set up a connection to a Kafka source, to consume messages from it.

For this exercise, we have set up a Kafka producer which we can connect to for some FX_RATE data.

Add implementations and properties

To start, we'll need to add some implementations to our application.

  1. Open the file server/ALM-app/build.gradle.kts, then add the following implementations:
    implementation("global.genesis:kafka-genesis:${properties["platformIntegrationVersion"]}")
implementation("software.amazon.msk:aws-msk-iam-auth:2.2.0")
  1. Then in the file server/gradle.properties, add the following property:
platformIntegrationVersion=8.4.0
  1. Also add this to the classpath of the ALM_PIPELINE process. Open the file server/ALM-app/src/main/genesis/cfg/ALM-processes.xml, then add the following to the ALM_PIPLINE process:
    <classpath>ALM-app*,aws-msk-iam-auth*</classpath>

Set up source properties

We need to set up some properties for our Kafka source.

  1. Open the file server/ALM-app/src/main/genesis/cfg/ALM-system-definition.kts.

  2. At the top of the file, add the following import:

import java.net.InetAddress;
  1. Go to the global section and add the following:
        // the following have been set up as global definitions to be used as part of the pipelines script, these can be host specific as well
item("BOOTSTRAP_SERVER", "boot-qjjhmpj3.c2.kafka-serverless.eu-west-2.amazonaws.com:9098")
item("CONSUMER_GROUP_ID", InetAddress.getLocalHost().getHostName())
// see topic creation comment in docker-compose.yml
item("KAFKA_TOPIC", "fx-rate")
// for running kafka locally we have set all security configurations to be PLAINTEXT, ensure that you use the appropriate security config for your application
item("KAFKA_SECURITY_CONFIG", "SASL_SSL")

Creating a new pipeline for Kafka

  1. Open the file server/ALM-app/src/main/genesis/scripts/ALM-pipelines.kts.

  2. We need to add some imports, and initialise some methods. Add the code below to the beginning of the file:

import global.genesis.pipeline.file.CsvRow
import global.genesis.CdTradeLoadCdTradeCsvDataMapper
import global.genesis.pipeline.api.db.DbOperation
import kotlinx.coroutines.flow.flow
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.common.serialization.StringDeserializer

val source = kafkaSource<String, String> {
// now you can define the kafka specific configurations, these can either be hardcoded into this script or configurable via system definitions as shown below
bootstrapServers = systemDefinition.getItem("BOOTSTRAP_SERVER").toString()
groupId = systemDefinition.getItem("CONSUMER_GROUP_ID").toString()
// here we are using the out of box deserialisers for simplicity but as mentioned in the README you can create your own as per your requirement
// you can provide any deserialiser as per your requirement as long as it is of type Deserializer<T> where T is the type of your key / value respectively specified when initialising kafkaSource above
keyDeserializer = StringDeserializer()
// here you can use PriceReceivedDeserialiser() for the value and the ConsumerRecord returned would have key String, value PriceReceived
valueDeserializer = StringDeserializer()
// ensure that this kafka topic has been created, in this example we do so when initialising the docker container
topic = systemDefinition.getItem("KAFKA_TOPIC").toString()
securityProtocol = systemDefinition.getItem("KAFKA_SECURITY_CONFIG").toString()
// if you would like to provide any additional kafka consumer config you can do this like so
additionalConfig = mapOf(
"sasl.mechanism" to "AWS_MSK_IAM",
"sasl.jaas.config" to "software.amazon.msk.auth.iam.IAMLoginModule required awsDebugCreds=false awsMaxRetries=\"10\" awsMaxBackOffTimeMs=\"500\";",
"sasl.client.callback.handler.class" to "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
)

}

// initialise the operator that converts the output of the kafka source (ConsumerRecords<String, Int> and returns a flow of each ConsumerRecord object in that batch
val splitOperator: SplitOperator<ConsumerRecords<String, String>, ConsumerRecord<String, String>> = SplitOperator { consumerRecords ->
flow {
// here we iterate over every ConsumerRecord
consumerRecords.forEach {
// here we emit it to the resulting flow
emit(it)
}
}
}
  1. You can add a new pipeline itself in the pipelines{} section. Use the code below:
  pipeline("KAFKA_TO_DB_PIPELINE") {
// sourcing from the kafka source as defined above
source(source)
// split operator to split up batch of ConsumerRecords as defined above
.split(splitOperator).split { input ->
flow<FxRate>{
val values = input.value().split("\",\"")
// processing to seperate each Fx Rate.
for (fxRates in values) {
val fxRate = fxRates.split(",")
val rate = fxRate[0].substring(fxRate[0].indexOf("\"") + 1).trim()
val targetCurrency = fxRate[1].substringBefore("\\")
val sourceCurrency = fxRate[2].substringBefore("\\").substringBefore("\"")
emit(FxRate(rate.toDouble(), targetCurrency, sourceCurrency))
}
}
}
.map {
// in order to use the database sink we must provide it a DbOperation to perform - in this case it's an upsert on each FxRate object provided by the above operation
DbOperation.Upsert(it)
}
// here we are using a simple database sink to perform the above operation - txDbSink is a transactional database sink
.sink(txDbSink())
}
Checking your work

You can view a final version of the code for the ALM app, including all the modifications outlined in this guide, in the ALM app repository.