Skip to main content

Core business logic (Event Handler)

Overview

The Genesis Application Platform has a real-time event-driven architecture. The Event Handler is a microservice responsible for providing these events, which can apply business logic and affect the application's data set. You can configure events, including optional and mandatory input fields, which the client can trigger via APIs.

Nearly every table defined in your application's data model in an app will need events to affect the data. The standard events for most tables are Insert, Modify and Delete (but not every table will need all these events); depending on your application workflow, you could need many more.

You have to specify the business logic to process each of these events. You do this in the *-eventhandler.kts files.

All the events that you specify are available via REST automatically, including Open API.

Example configuration


eventHandler {

// You can inject other kotlin or java libraries for use in event handlers
val pricingLibrary = inject<PricingLibrary>()

eventHandler<Counterparty>("COUNTERPARTY_INSERT", transactional = true) {
permissioning {
permissionCodes = listOf("CounterpartyUpdate")
}
onCommit { event ->
val details = event.details
val insertedRow = entityDb.insert(details)
ack(listOf(mapOf(
"COUNTERPARTY_ID" to insertedRow.record.counterpartyId,
)))
}
}
eventHandler<Counterparty>("COUNTERPARTY_MODIFY", transactional = true) {
permissioning {
permissionCodes = listOf("CounterpartyUpdate")
}
onCommit { event ->
val details = event.details
entityDb.modify(details)
ack()
}
}
eventHandler<Counterparty.ById>("COUNTERPARTY_DELETE", transactional = true) {
permissioning {
permissionCodes = listOf("CounterpartyUpdate")
}
onCommit { event ->
val details = event.details
entityDb.delete(details)
ack()
}
}

eventHandler<Trade>("TRADE_INSERT", transactional = true) {
permissioning {
permissionCodes = listOf("TradeInsert")
//Check user has permission to trade the submitted counterparty
auth(mapName = "ENTITY_VISIBILITY") {
authKey {
key(data.counterpartyId)
}
}
}

onValidate {
//Where the eventHandler type is a DAO object, details map to a conforming trade dao object
val trade = event.details

//Verify related entities on this trade exist in the DB
verify {
entityDb hasEntry Instrument.ById(trade.instrumentId)
entityDb hasEntry Counterparty.ById(trade.counterpartyId)
}
val price = trade.price
//Validate the price is not negative
require(price >= 0.0) { "Price cannot be negative" }

//Warn if the price is far from market price utilizing pricing library function
if(pricingLibrary.priceFarFromCurrentMarketPrice(trade.price, trade.direction)) {
return@onValidate warningNack("Price out of bounds from market price")
}

ack()
}

onCommit { event ->
val trade = event.details
val result = entityDb.insert(trade)
ack(listOf(mapOf(
"TRADE_ID" to result.record.tradeId,
)))
}
}
}

Configuration options

All eventHandler blocks should be placed under a single outer eventHandler block per the example configuration above.

It is important to understand the concepts of validation vs commit, and ack vs nack.

There are two main blocks to an eventHandler, onValidate and onCommit. The following images show the execution workflow where validate is set by the client to true vs false, and how nack (unsuccessful event) terminates the event unsuccessfully and will stop further execution of the event:

ack

An ack needs to be supplied at the end of any onValidate and onCommit blocks; It signifies that the event was successful.

eventHandler<Trade>(name = "TRADE_INSERT") {
onValidate { event ->
...
ack()
}
onCommit { event ->
...
ack()
}
}

If VALIDATE is set to true on the inbound event message, and an ack is reached in the onValidate block, a MESSAGE_TYPE of EVENT_ACK is returned to the client and the event will stop execution.

A MESSAGE_TYPE of EVENT_ACK will be returned to the client.

A successful event can also send data back to the client as part of an ack. A very common use case is to return the ID(s) of newly inserted records so that the client is aware and can handle these appropriately; where the client is a UI, the IDs can be displayed to the user; if the client is a system, it may want to store the ID in its own records.

  eventHandler<Trade>("TRADE_INSERT") {
onCommit { event ->
val trade = event.details
val result = entityDb.insert(trade)
ack(listOf(mapOf("TRADE_ID" to result.record.tradeId)))
}
}

nack

A nack statement signifies that an event failed. This is typically sent during onValidate and sometimes during onEvent.

It can take a string argument, which is the error message passed back to the client.

Where triggering within an onValidate block, prefix the nack with return@onValidate to terminate validation at this point.

To trigger a nack within an onCommit block, prefix the nack with return @onCommit.

eventHandler<Trade>(name = "TRADE_INSERT") {
onValidate { event ->
val price = event.details.price
if (price < 0.0) {
return@onValidate nack("Price cannot be negative")
}
...
}
onCommit { event ->
val marketPrice = pricingLibrary.getCurrentBid(event.details.instrumentId)
if (marketPrice == null) {
return@onCommit nack("Unable to determine market price for instrument ${event.details.instrumentId}")
}
...
}
}

A nack can also take an object of type Throwable, in which case the message of the Throwable will be sent back to the client.

Where a nack is triggered, MESSAGE_TYPE returned to the client is an EVENT_NACK.

eventHandler

eventHandler defines a new event.

In the parenthesis you can define:

  • name : the name of the event. name will automatically be prefixed with EVENT_. so name = "TRADE_INSERT" will be targeted by the client as EVENT_TRADE_INSERT. Where left unspecified the name will default to the eventHandler Type class name.
  • transactional : takes a boolean. Set to true where you want your eventHandler to comply with ACID. Any exception returned will result in a complete rollback of any database writes. While an exception will trigger a rollback, the transaction will commit if a nack or ack is returned. This setting also enables auto-auditing for writes to any audited tables.
  eventHandler<Trade>(name = "TRADE_INSERT", transactional = true) {
onCommit { event ->
val trade = event.details
entityDb.insert(trade)
ack()
}
}

The <Type> (aka Generic) parameter is used to set the event metadata. This metadata defines the event inputs the client can send in, and a certain amount of pre-validation will be performed on any inbound event messages based on this metadata (unless schemaValidation is turned off).

You can use your table entity DAO as the <Type>. In this case, the event will inherit the table's fields, their types and their nullability (which determines whether the field is optional or mandatory).

eventHandler<Trade>(name = "TRADE_INSERT") {
...
onCommit { event ->
val trade = event.details
entityDb.insert(trade)
ack()
}
}

Otherwise, you can create a kotlin data class to define your metadata and use it.

You can also use an index for an event type, this is very common for DELETE events, where the client simply needs to send the ID of the record to delete:

eventHandler<Trade.ById>(name = "TRADE_DELETE") {
...
onCommit { event ->
val trade = event.details
entityDb.delete(trade)
ack()
}
}

schemaValidation

schemaValidation takes a boolean. Where set to false, it disables JSON Schema validation enforced for type-safe messages for this event.

schemaValidation defaults to true when undefined.

  eventHandler<Trade>(name = "TRADE_INSERT", transactional = true) {
schemaValidation = false
...
}

excludeMetadataFields

excludeMetadataFields takes a set of field names, which are defined as part of the event meta (e.g Trade in our example below), to exclude them from event metadata.

  eventHandler<Trade>(name = "TRADE_INSERT", transactional = true) {
excludeMetadataFields = setOf("DATE")
...
}

In this example, DATE defined on Trade will not be part of the meta for this event.

By default, excludeMetadataFields includes two fields, RECORD_ID and TIMESTAMP. These fields are part of every database object; their values are always set by the database layer, so the client typically does not send them.

warning

Typically when using a table entity as the eventHandler <Type>, you don't need to use excludeMetadataFields. If you do, be sure to keep the 2 default fields RECORD_ID and TIMESTAMP listed in your set.

An exception is when you are using optimistic concurrency mode. In this case, it is essential to include the TIMESTAMP field in the event meta. This happens automatically where excludeMetadataFields has not been set.

overrideMetadataFields

overrideMetadataFields is used to override the metadata field properties of the eventHandler type, this isn't to change the type of a field, more to override fields being optional and default values.

  eventHandler<Trade>(name = "TRADE_INSERT", transactional = true) {
overrideMetadataFields = listOf(
OverrideMetaField(name = "QUANTITY", optional = true, default = 1)
)
}

By default this is an empty set.

warning

Where setting mandatory fields to be optional, you must have a default supplied as part of the event Type meta.

permissioning

The permissioning block is used to implement access control measures on the data being returned to a user.

tip

With the exception of the inner auth block which relies on inbound data, the permissioning block and its contents can also be placed in the outer block and apply to all query/requestServer/eventHandler blocks named in the file.

permissionCodes

permissionCodes takes a list of permission codes. The client user accessing the query must have access to at least one of the permission codes in the list to be able to access any query data.

    permissioning {
permissionCodes = listOf("TradeView", "TradeUpdate")
}
customPermissions

The customPermissions block takes boolean logic. If this function returns true, the user will be able to access the resource; otherwise, the request will be rejected.
It is useful, for example, in the case you have an external API to check against. In this example an entitlementUtils object wraps an external API we can call to check the user's name has access.

  customPermissions { message ->
entitlementUtils.userIsEntitled(message.userName)
}

The customPermissions block also has access to entityDb so that you can query any data in the database. You can add complex logic in here as needed, and have access to the full inbound message.

  customPermissions { message ->
val userAttributes = entityDb.get(UserAttributes.byUserName(message.userName))
userAttributes?.accessType == AccessType.ALL
}

Where utilizing customPermissions you should also consider configuring a customLoginAck so that the front end of the application can also permission its components in a similar way.

auth

auth is used to restrict rows (queries) or events with a particular entity based on a user's entity access, also known as row-level permissions.

permissioning {
auth(mapName = "ENTITY_VISIBILITY") {
authKey {
key(data.counterpartyId)
}
}

Details on restricting query entity access here

Auth definitions can be grouped with “and” or “or” operators.

  • You could have two simple permission maps, for example: one by counterparty and another one for forbidden symbols. If the user wants to see a specific row, they need to have both permissions at once.
  • You could have two permission maps: one for buyer and one for seller. A user would be allowed to see a row if they have a seller or buyer profile, but users without one of the those profiles would be denied access.

This example shows an AND grouping:

permissioning {
auth(mapName = "ENTITY_VISIBILITY") {
authKey {
key(data.counterpartyId)
}
} and auth(mapName = "SYMBOL_RESTRICTED") {
authKey {
key(data.symbol)
}
}
}

This example shows OR grouping:

permissioning {
auth(mapName = "ENTITY_VISIBILITY") {
authKey {
key(data.buyerId)
}
} or auth(mapName = "ENTITY_VISIBILITY") {
authKey {
key(data.sellerId)
}
}
}
userHasRight

userHasRight is a helpful function which can be called within the auth block, or anywhere else in client facing server components which have a user accessing them, to determine if a user has a given right.

if (!userHasRight(userName, "TradeViewFull")) { ... }

requiresPendingApproval

This is used where particular system events require a second system user to approve them before they take effect in the application.

In the example below, you can optionally set the approval message. This is the message that an authorized approver will see during their review.

The function must return boolean logic, where it evaluates to false the event will be subject to approval.

  eventHandler<Trade>("TRADE_INSERT") {
requiresPendingApproval { event ->
event.approvalMessage = "Please approve my new trade"
event.userName != "system.user"
}
...
}

In this example, only a user with username "system.user" is able to trigger this event without it first being approved.

Note that the client may also send APPROVAL_MESSAGE and, where specified, approvalMessage will take precedence over it.

See approval workflow for more details.

onValidate

The onValidate block is used for validation. The Event Handler workflow allows clients to set boolean the flag VALIDATE. When set to true, this runs the code in the onValidate block and returns before executing the onCommit block, which commits the action (e.g. write to database, or perhaps generate a file or some other permanent action). For this reason, the entityDb handle used in this block is for read operations only.

As well as returning an ack (success) or a nack, the onValidate block allows users to send soft warnings back as part of the validation. The client can ignore these warnings, for example if the client is a user interface.

onValidate is helpful for systems where you may want to add "fat-finger checks", a common term in financial applications which essentially means we're checking if the user may have made a mistake in their input. The check is to see if they have set a quantity or price that is unusually large or small, and could contain mistakes which lead to financial loss. Such warnings can be sent back to the client in a warningNack.

onValidate blocks should always return an ack as the final statement.

onValidate blocks are optional and will automatically return an ack where not defined.

verify

Within the onvalidate block, a verify block is helpful for common relational checks. For example, to check that referenced ID-based entities already exist in the database.

In this example we verify that the instrument and counterparty IDs that were supplied on the event exist in the database.

  onValidate {
...
verify {
entityDb hasEntry Instrument.ById(event.details.instrumentId)
entityDb hasEntry Counterparty.ById(event.details.counterpartyId)
}
...
}

If the boolean logic fails, validation of the event has failed, and a nack is returned.

require

require is a kotlin construct often used in onValidate blocks, because where the defined boolean logic statement fails, it automatically returns a nack with the given error message.

In the example below, if a price is not "more than or equal to 0.0" the message "Price cannot be negative" is returned.

  eventHandler<Trade>(name = "TRADE_INSERT") {
onValidate { event ->
val price = event.details.price
require(price >= 0.0) { "Price cannot be negative" }
...
}
...
}
warningNack

A warningNack is only applicable to the onValidate block, and is used to warn the client. These warnings are sometimes referred to as "soft errors".

warningNack, like nack, accepts a String parameter, which is the message given back to the client. For example, warningNack("Price differs more than 10% from the current market price.").

A warningNack may also take an object of type Throwable. In this case, the message of the Throwable will be sent back to the client.

    onValidate {
...
if(pricingLibrary.priceFarFromCurrentMarketPrice(trade.price, trade.direction)) {
return@onValidate warningNack("Price out of bounds from market price")
}
...
}

Clients using the API are expected to set IGNORE_WARNINGS to true on the inbound message where they wish to ignore warnings. For example, if a warning is presented to a user, they can be presented with an option to ignore, or to edit and re-submit for validation.

System clients can just set IGNORE_WARNINGS to true on all messages to ensure that:

  • warnings don't stop system integration messages persisting
  • complex logic to determine whether to ignore specific warnings is not required
approvableAck

An approvableAck should be returned in an onValidate block where requiresApproval has been set on the event.

There are useful properties you can set as part of the approvableAck definition. They are all optional and are detailed below:

  • entityDetails is a list of ApprovalEntityDetails with their corresponding entityTable, entityId and approvalType properties (see previous paragraph). By default, this list is empty.
  • approvalMessage contains the text that is sent back to the client, assuming the event is successfully submitted for approval. The default is "Your request was successful and has been submitted for approval".
  • additionalDetails can provide context information that is only available from a server-side perspective. This information complements the APPROVAL_MESSAGE content provided by the front end.
  • approvalType is used to state the action that happens when this event is approved: NEW for insertions, UPDATE for amends, REMOVE for removals. If undefined, this defaults to UNKNOWN. Most events will be simple, but of course some could affect multiple entities in different ways, which is why the entityDetails parameter can contain many entities, each with its own approvalType.
  eventHandler<Trade>("TRADE_MODIFY") {
requiresPendingApproval { event ->
event.approvalMessage = "Potential cost to firm modifying trades which have already been reported." //Available to the user who is approving
event.userName != "system.user"
}
onValidate { event ->
val trade = event.details
approvableAck(
entityDetails = listOf(
// One or many entities can be affected with a single event, so we can provide the whole list here
ApprovalEntityDetails(
entityTable = "TRADE",
entityId = event.details.trade.toString(),
approvalType = ApprovalType.UPDATE
)
),
approvalMessage = "Trade modification for ${event.details.tradeId} has been sent for approval.", //Sent to the user who submitted the event
approvalType = ApprovalType.UPDATE,
additionalDetails = "Potential cost to firm modifying trades which have already been reported." //Further message available to the user who is approving
)
}
...
}

See approval workflow for more details.

onCommit

The onCommit block runs after successful validation in the onValidate block. The code here typically affects the data in the system (for example, writing to the database, generating a file, triggering some other event). This block may also perform further validations not possible in the previous block (e.g. relying on database updates in an event which writes multiple records). To reach the onCommit block, VALIDATE must have been set to false.

The entityDb handle in this block can be used to read and write.

As with onValidate the block must finish with an ack. If the event is set as transactional and the ack is not reached, any database writes will be rolled back.

  eventHandler<Trade>("TRADE_INSERT") {
onCommit { event ->
val trade = event.details
val result = entityDb.insert(trade)
ack(listOf(mapOf("TRADE_ID" to result.record.tradeId)))
}
}

contextEventHandler

contextEventHandler is similar to an eventHandler, but is used where you want to pass context from the onValidate block to the onCommit block. For example, on a modification event: if you first look up the original entity record and want to pass to onCommit so that it doesn't need to be re-read from the database.

  contextEventHandler<Trade, Trade>(name = "TRADE_MODIFY") {
onValidate { event ->
val tradeDetails = event.details

val originalTrade = entityDb.get(Trade.ById(event.details.tradeId))
require(originalEmailDist != null) {
"Unknown Trade ID received ${event.details.csdCode}"
}
...
validationAck(validationContext = originalTrade)
}
onCommit { event, trade ->
...
entityDb.modify(trade)
ack()
}
}
  1. The definition takes two <Type> (aka Generic) parameters instead of one. The first is the context object, which will be passed to the onCommit block. The second is the same as a regular eventHandler: the event metadata.
  2. The onValidate block needs to return a validationAck instead of a regular ack.
  3. The onCommit block has two lambda parameters, the standard event + a second, which is a reference to the context object returned in the validationAck.

validationAck

The same concept as an ack[] but must return an object matching the type of the first <Type> parameter in the contextEventHandler definition.

  contextEventHandler<Trade, Trade>(name = "TRADE_MODIFY") {
onValidate { event ->
val originalTrade = entityDb.get(Trade.ById(event.details.tradeId))
...
validationAck(validationContext = originalTrade)
}
}

Logging

Genesis provides a class called LOG that can be used to insert custom log messages. This class provides you with 5 methods to be used accordingly: info, warn, error,trade,debug. To use these methods, you need to provide, as an input, a string that will be inserted into the Logs.

Here is an example where you can insert an info message into the log file:

LOG.info("This is an info message")

The LOG instance is a Logger from SLF4J.

note

In order to see these messages, you must set the logging level accordingly. You can do this using the logLevel command.

Optimistic concurrency

Optimistic Concurrency helps prevent users from updating or deleting a stale version of a record. Our page on Optimistic Concurrency provides details around how this works for event handlers.

Client API

tip

To use the API, clients must first authenticate to obtain a SESSION_AUTH_TOKEN by calling EVENT_LOGIN_AUTH.

tip

This API is also accessible via Open API.

EVENT_<EVENT_HANDLER_NAME>

To trigger an Event Handler, the client sends a MESSAGE_TYPE of EVENT_<event name>.

There are several options that can be specified in the message header:

OptionDefaultDescription
VALIDATEfalseWhere true, events will return after the onValidate block ack and not trigger the onCommit block
IGNORE_WARNINGSfalseWhere true, any warningNack in the code will be ignored and not cause the event to fail validation
REQUIRES_APPROVALfalseA client can force the approval workflow for the event by setting this to true
APPROVAL_MESSAGEWhere approval workflow is triggered, this is the approval message the approver will see. This may be overridden in the event 'requiresPendingApproval`, which takes precedence
REASONWhere the event is set as transactional and the tables written by the event are enabled for audit this String will be set in the audit record's AUDIT_EVENT_TEXT

Request

{
"SOURCE_REF": "345",
"SESSION_AUTH_TOKEN": "snsFDc7AdjG8Khl95uUdUBm5UO1uqIfq",
"MESSAGE_TYPE": "EVENT_TRADE_INSERT",
"DETAILS": {
"COUNTERPARTY_ID": 1,
"DATE": 1731542400000,
"DIRECTION": "BUY",
"INSTRUMENT_ID": 2,
"QUANTITY": 1000,
"TRADE_PRICE": 1.23
}
}

Response

{
"MESSAGE_TYPE": "EVENT_ACK",
"SOURCE_REF": "345",
"GENERATED": [
{
"TRADE_ID": 5
}
],
"METADATA": {
"IS_EMPTY": true,
"ALL": {}
}
}

Approval workflow

The Genesis Application Platform has an in-built pending approval mechanism that can be used with Event Handlers. This is useful where particular events require a second user to approve them in order to take effect. Genesis Pending Approvals works with the concepts of “delayed” events and "four-eyes check".

Events are essentially queued until the time they are approved. Once approved, they are re-triggered to take affect.

Require approval on an event

To enable the pending approval workflow for an Event Handler implementation, either:

  • Override the requiresPendingApproval method with an appropriate function in the custom Event Handler definitions.

or

  • Configure the requiresPendingApproval block in a GPAL Event Handler.

Both of these options involve implementing a function with the event message as an input and a boolean value as a return value.

Here is an example of a GPAL Event Handler definition where event.userName is used to gain access to the user who triggered the event; if the user name is not system.user, it is directed to an approval procedure:

eventHandler {
eventHandler<Company>("COMPANY_INSERT") {
// Override requiresPendingApproval here to enable the "pending approval" flow.
// In this implementation, any user that is not "system.user" needs to go through the approval mechanism.
// The last line just needs to evaluate to a boolean; if false it does not require approval, if true it does
requiresPendingApproval { event ->
event.userName != "system.user"
}
onCommit { event ->
val company = event.details
// custom code block..
ack()
}
}
}

or in a custom Event Handler definition:

package global.genesis.position.samples.events.async

import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.async.AsyncContextValidatingEventHandler
import global.genesis.gen.dao.Company
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply
import global.genesis.message.core.event.ValidationResult

@Module
class TestCompanyHandlerAsyncContext : AsyncContextValidatingEventHandler<Company, EventReply, String> {
override suspend fun onValidate(message: Event<Company>): ValidationResult<EventReply, String> {
val company = message.details
// custom code block..
val companyName = company.companyName
return validationResult(ack(), companyName)
}

override suspend fun onCommit(message: Event<Company>, context: String?): EventReply {
if(context != null){
// Do something with the context
}
val company = message.details
// custom code block..
return ack()
}
}

Pending approval workflow

Events going through a pending approval workflow are validated as usual (i.e. the onValidate method is run). If the validation is successful, the “delayed” event is stored in the APPROVAL table in JSON format.

Assuming the event is inserting, updating or deleting a target database record, it is possible to have multiple APPROVAL records associated with a single database entity. So, you should use the event onValidate method to check for pre-existing approvals against the entities related to the event if you need to ensure there is only one pending approval per record.

The APPROVAL record is keyed on an auto-generated APPROVAL_ID and does not have a direct link to the original record(s). You have to create one or many links by adding “approval entity” details to the payload returned within an approvableAck inside the onValidate method. These details include the entityTable (e.g COUNTERPARTY), entityKey (e.g. COUNTERPARTY_ID), as well as an optional approvalType to describe what operation is happening on the entity itself (e.g. NEW, UPDATE or REMOVE).

This approach enables you to decide how to identify the original record (e.g. creating a compound key in the case of multi-field keys). When the approval entity details are provided, the platform creates one or more records in the APPROVAL_ENTITY table; it populates it (them) with the details provided and the APPROVAL_ID of the APPROVAL record. There is also an APPROVAL_ENTITY_COUNTER, which is populated by the GENESIS_AUTH_CONSOLIDATOR process by default; this can be handy when you need to know how many approvals are pending for a given entity.

There are other useful properties you can set as part of the approvableAck definition. They are all optional and are detailed below:

  • entityDetails is a list of ApprovalEntityDetails with their corresponding entityTable, entityId and approvalType properties (see previous paragraph). By default, this list is empty.
  • approvalMessage contains the text that is sent back to the client, assuming the event is successfully submitted for approval. The default is "Your request was successful and has been submitted for approval".
  • additionalDetails can provide context information that is only available from a server-side perspective. This information complements the APPROVAL_MESSAGE content provided by the front end.
  • approvalType is used to state the action that happens when this event is approved: NEW for insertions, UPDATE for amends, REMOVE for removals. If undefined, this defaults to UNKNOWN. Most events will be simple, but of course some could affect multiple entities in different ways, which is why the entityDetails parameter can contain many entities, each with its own approvalType.

One further property, approvableAck, can be used in both custom eventHandler definitions and GPAL Event Handlers. Below is an example of approvableAck in action for a GPAL eventHandler onValidate block.

eventHandler {
eventHandler<Company>("COMPANY_AMEND") {
// Override requiresPendingApproval here to enable the "pending approval" flow.
// In this implementation, any user that is not "system.user" needs to go through requires going through the approval mechanism.
// The last line just needs to evaluate to a boolean; if false it does not require approval, if true it does
requiresPendingApproval { event ->
event.userName != "system.user"
}
onValidate { event ->
val company = event.details
// custom validation code block..
return approvableAck(
entityDetails = listOf(
// One or many entities can be affected with a single event, so we can provide the whole list here
ApprovalEntityDetails(
entityTable = "COMPANY",
entityId = event.details.companyId.toString(),
approvalType = ApprovalType.UPDATE
)
),
approvalMessage = "Company update for ${event.details.companyId} has been sent for approval.",
approvalType = ApprovalType.UPDATE,
additionalDetails = "Sensitive update, tread carefully"
)
}
onCommit { event ->
val company = event.details
// custom code block..
ack()
}
}
}

The platform provides two Data Server queries that contain Pending approval information: ALL_APPROVAL_ALERTS and ALL_APPROVAL_ALERTS_AUDITS

Example APPROVAL DB record

-------------------------------------------------------------------------------------------
TIMESTAMP 2023-02-27 15:33:42.364(n:0,s:1019) NANO_TIMESTAMP
ACTIONED_BY JaneDee STRING
APPROVAL_ID fdef7802-6bd1-4c51-a232-6a4bc2325598A... STRING
APPROVAL_KEY fac1be9f-1653-4ecf-9050-d13cc2d2cdb4A... STRING
APPROVAL_MESSAGE Cancelled STRING
APPROVAL_REQUESTED_AT 2023-02-27 15:33:38.450 +0000 DATETIME
APPROVAL_STATUS CANCELLED ENUM[PENDING APPROVED CANCELLED REJECTED_BY_USER REJECTED_BY_SERVICE]
APPROVAL_TYPE REMOVE ENUM[NEW UPDATE REMOVE UNKNOWN]
DESTINATION COUNTERPARTY_EVENT_HANDLER STRING
EVENT_DETAILS ISSUER_ID = 3 STRING
EVENT_MESSAGE {"DETAILS":{"COUNTERPARTY_ID":3},"MES... STRING
MESSAGE_TYPE EVENT_COUNTERPARTY_DELETE STRING
USER_NAME JaneDee STRING
-------------------------------------------------------------------------------------------

Example APPROVAL_ENTITY record

-------------------------------------------------------------------------------------------
TIMESTAMP 2023-02-27 15:33:38.459(n:0,s:1004) NANO_TIMESTAMP
APPROVAL_ID fdef7802-6bd1-4c51-a232-6a4bc2325598A... STRING
APPROVAL_TYPE REMOVE ENUM[NEW UPDATE REMOVE UNKNOWN]
ENTITY_ID 3 STRING
ENTITY_TABLE COUNTERPARTY STRING
-------------------------------------------------------------------------------------------

Pending approval events

Once in the APPROVAL table, the pending event can be cancelled, rejected or accepted by sending the following event messages to GENESIS_CLUSTER:

  • EVENT_PENDING_APPROVAL_ACCEPT
  • EVENT_PENDING_APPROVAL_CANCEL
  • EVENT_PENDING_APPROVAL_REJECT

All messages require a valid APPROVAL_ID and APPROVAL_MESSAGE in their metadata.

Allowed approvers

The platform ensures that users cannot approve or reject their own events, but they can cancel them. To complement this, users that have not created a specific pending approval event can only accept or reject, not cancel.

Additional levels of control (e.g. based on user groups) can be added at three points:

  • to the front end
  • to the event onValidate method
  • specified in the server-side configuration

To configure the allowed approvers using server-side configuration:

  1. Create a new GPAL approval file; its name must end in -approval.kts (e.g. test-approval.kts).

  2. Add the new file's name to the GENESIS_CLUSTER process definition's <script></script> element found in your application's cfg/genesis-processes.xml (values should be comma-separated). If this file is not in your application, read here how to find and override the file.

See the sample file below:

import global.genesis.session.RightSummaryCache

val rightSummaryCache = inject<RightSummaryCache>()

pendingApproval {
insert {
true
}

accept {
val userAttributes = entityDb.get(UserAttributes.byName(userName))
userAttributes?.accessType == AccessType.INTERNAL
}

cancel {
true
}

reject {
rightSummaryCache.userHasRight(userName, "REJECT_PENDING_APPROVAL")
}
}

You can replace the "true" return values with Kotlin code in each of the relevant blocks. Alternatively, you don't need to define them at all, as they return "true" by default.

The platform makes the following objects accessible to the insert block:

  • insertMessage - an instance of the PendingApprovalInsert class, which is used to populate the APPROVAL table if successful. The content of this class consists of several properties:
    • approvalMessage - contains the original approval message text sent by the user who initiated the action.
    • messageType - represents the original EVENT name (e.g. EVENT_TRADE_INSERT).
    • destination - is the process name this event was originally targeting (e.g. POSITION_EVENT_HANDLER).
    • eventMessage - contains the JSON object representing the original message payload.
    • approvalType - equivalent to the property with the same name provided as part of approvableAck (see pending approval workflow).
    • additionalDetails - equivalent to the property with the same name provided as part of approvableAck (see pending approval workflow).
    • generated - equivalent to the property named entityDetails provided as part of approvableAck (see pending approval workflow).
  • userName - a string property containing the user name who triggered the event.
  • messageType - a shortcut property accessor for the messageType value stored inside insertMessage.
  • eventMessage - a shortcut property accessor for the eventMessage value stored inside insertMessage.

The following objects are accessible within the accept, cancel and reject blocks:

  • userName - a string property containing the user name who triggered the pending approval event (e.g. accept, reject or cancel).
  • pendingApproval - the pending approval record stored in the database. The type of this property is the "Approval" database entity (see table entities).
  • approvalMessage - an instance of the ApprovalMessage class, which represents the payload of the message sent to EVENT_PENDING_APPROVAL_ACCEPT, EVENT_PENDING_APPROVAL_CANCEL and EVENT_PENDING_APPROVAL_REJECT. It contains two properties:
    • approvalMessage - the message text sent by the user who initiated this pending approval action
    • approvalId - contains the APPROVAL_ID used to identify the APPROVAL record we are handling as part of this action
  • messageType - a shortcut property accessor for the messageType value stored inside pendingApproval.
  • eventMessage - a shortcut property accessor for the eventMessage value stored inside pendingApproval.

The following properties are automatically available for the whole scope of the -approval.kts file:

val systemDefinition: SystemDefinitionService
val rxDb: RxDb
val entityDb: AsyncEntityDb
val evaluatorPool: EvaluatorPool
val networkConfiguration: NetworkConfiguration
val serviceDetailProvider: ServiceDetailProvider
val serviceDiscovery: ServiceDiscovery
val injector: Injector

As shown in the previous code example, you can perform database lookups to retrieve additional information and return true only if the necessary rights or attributes are in place. For example, if your system has the concept of internal and external users, and you only want to allow internal users to accept pending events, then you could check your custom user "ACCESS_TYPE" field as follows:

pendingApproval {
accept {
val userAttributes = entityDb.get(UserAttributes.byName(userName))
userAttributes?.accessType == AccessType.INTERNAL
}
}
Advanced allowed approvers

You might have noticed that the original type-safe event message types are lost inside the -approval.kts file, as the content of eventMessage inside APPROVAL table (and also inside PendingApprovalInsert) is a serialized JSON string. You can deserialize the original type-safe objects using the selectPredicate method combined with multiple onEvent predicates. These methods are available in all the pendingApproval code blocks: insert, accept, cancel and reject.

  • selectPredicate is a function that accepts an indeterminate number of functions returning a boolean value, as well as a mandatory default function to handle messages that do not fall into any defined category. The default function provides a GenesisSet object with the contents of the original message payload.
  • onEvent works very similarly to any other GPAL Event Handler definition. It enables you to treat the incoming message in the same way as you would have done within the original Event Handler; however, each function must return a boolean expression.

Please see the example below for custom logic using a table called "RESTRICTED_SYMBOL" to prevent restricted symbols from being added to the system, as well as checking user right codes:

import global.genesis.session.RightSummaryCache

val rightSummaryCache = inject<RightSummaryCache>()

pendingApproval {
accept {
selectPredicate(
onEvent<TradeInsert>("TRADE_INSERT") { event ->
val tradeInsert = event.details
val stockInRestrictedList = entityDb.get(RestrictedSymbol.bySymbol(tradeInsert.symbol))
// Deny any operation on restricted symbols.
if (stockInRestrictedList != null) {
false
} else {
rightSummaryCache.userHasRight(userName, "TRADE_INSERT")
}
},
onEvent<TradeAmend>("TRADE_AMEND") { event ->
val tradeAmend = event.details
val stockInRestrictedList = entityDb.get(RestrictedSymbol.bySymbol(tradeAmend.symbol))
// Deny any operation on restricted symbols.
if (stockInRestrictedList != null) {
false
} else {
rightSummaryCache.userHasRight(userName, "TRADE_AMEND")
}
},
onEvent<TradeDelete>("TRADE_DELETE") { event ->
val tradeDelete = event.details
val stockInRestrictedList = entityDb.get(RestrictedSymbol.bySymbol(tradeDelete.symbol))
// Deny any operation on restricted symbols.
if (stockInRestrictedList != null) {
false
} else {
rightSummaryCache.userHasRight(userName, "TRADE_DELETE")
}
},
// If the message can't be deserialized we will use a default fallback to genesisSet.
default = { genesisSet ->
true
}
)
}
}

System rejects

An approval process deliberately delays events. So it is possible that by the time a pending approval action is approved, the underlying data has changed enough to cause the delayed event to fail when executed. When this happens, the pending approval record will be marked as REJECTED_BY_SYSTEM in its APPROVAL_STATUS field.

The platform provides an event designed to help you to reject events directly from the back end without human intervention. "EVENT_PENDING_APPROVAL_SYSTEM_REJECT" is an event that can be used to satisfy any specific requirements that fall outside the functionality of the pending approval system. The event is only accessible by back-end services. It takes two parameters:

  • approvalMessage is the text of the message to be sent when the message is rejected.
  • approvalKey is a unique identifier for each pending approval record. This identifier is never exposed to the front end, so only back-end services have access to it. That makes it impossible to trigger this event unless you have access to the database system. You need to obtain the approvalKey programmatically so that you can supply it as a parameter value.

Let us look at an example. Consider a solution that needs to submit trades to an external system every day at midnight for confirmation purposes. Once the trades have been submitted, their content cannot be changed anymore, unless a separate amendment process is started outside the Genesis application. The system has a pending approval system that enables users to amend the content of each "Trade" database record to provide extra security.

Once a "Trade" record has been submitted at midnight, you need to prevent any pending approval events from amending the content of the trade records in the system, as this could cause inconsistencies between the external system and the Genesis system. To prevent this, you could run a job that automatically rejects all pending approval records at midnight every day.

Here is an example GPAL script that could be run every day at midnight to reject all the pending records.

import global.genesis.clustersupport.service.ServiceDiscovery
import global.genesis.message.core.event.ApprovalSystemRejectMessage
import global.genesis.message.core.event.Event
import global.genesis.pal.shared.inject
import kotlin.system.exitProcess

val serviceDiscovery: ServiceDiscovery = injector.inject()
val client = serviceDiscovery.resolveClientByResource("EVENT_PENDING_APPROVAL_SYSTEM_REJECT")
if (client == null || !client.isConnected) {
println("Unable to find service exposing EVENT_PENDING_APPROVAL_SYSTEM_REJECT")
exitProcess(1)
} else {
suspendable {
entityDb.getBulk<Approval>()
.filter { it.approvalStatus == ApprovalStatus.PENDING }
.collect { approval ->
val reply: EventReply? = client.suspendRequest(
Event(
messageType = "EVENT_PENDING_APPROVAL_SYSTEM_REJECT",
userName = "SYSTEM",
details = ApprovalSystemRejectMessage(
approvalKey = approval.approvalKey,
approvalMessage = "Rejected by system"
)
)
)
when (reply) {
is EventReply.EventAck ->
println("Successfully rejected APPROVAL_ID: ${approval.approvalId}")
is EventReply.EventNack ->
println("Failed to rejected APPROVAL_ID: ${approval.approvalId}: $reply")
else ->
println("Unexpected response from pending approval system: $reply")
}
}
}
}

Metrics

info

Ensure you have enabled metrics in your environment to view them.

The Event Handler latency metrics show how long it takes for a specific eventHandler in the Event Handler to process a message.

MetricExplanation
processing_latencyThe latency for processing events (kts event handler)
latencyThe latency for processing events (kotlin/java event handler)

Runtime configuration

To include your *-eventhandler.kts file definitions in a runtime process, you need to set the process definition:

  1. Ensure genesis-pal-eventhandler is included in module
  2. Ensure global.genesis.eventhandler.pal is included in package
  3. Ensure your eventhandler.kts file(s) are defined in script
  4. Ensure pal is set in language

If you wish to run a dedicated process for an Event Handler, here is an example full process definition:

  <process name="POSITION_EVENT_HANDLER">
<groupId>POSITION</groupId>
<start>true</start>
<options>-Xmx256m -DRedirectStreamsToLog=true -DXSD_VALIDATE=false</options>
<module>genesis-pal-eventhandler</module>
<package>global.genesis.eventhandler.pal</package>
<script>position-eventhandler.kts</script>
<description>Handles events</description>
<classpath>position-messages*,position-eventhandler*</classpath>
<language>pal</language>
</process>

See runtime configuration - processes for further details.

Performance

If better database response performance is required, you may also consider added a cache to the process. See runtime configuration - process cache for more details.

Testing

info

GenesisJunit is available from version 8 of the Genesis Server Framework (GSF).

If you are testing against a previous version of the framework, go to the legacy section.

Integration testing

This section covers the basics of testing Event Handlers. We shall use a very simple example, and work through the communication between our test and the Event Handler.

This includes how to test dynamic authorization.

This testing relies on GenesisJunit, which is designed to make testing easy.

In this example, we shall test the following Event Handler:

data class Hello(
val name: String,
)

eventHandler {
eventHandler<Hello>("HELLO_WORLD") {
onCommit {
ack()
}
}
}

Creating the test class

First, use the code below to create the test class:

@ExtendWith(GenesisJunit::class)
@ScriptFile("hello-world-eventhandler.kts")
class EventHandlerTest {

// our tests go here ...
}

The code above does two things:

  • It enables GenesisJunit.
  • It specifies the Event Handler script that we want to test, using the ScriptFile annotation.

There is more information about GenesisJunit and the various annotations in the section on Integration testing.

Injecting an Event Handler client

Use the code below to inject an Event Handler client:

@ExtendWith(GenesisJunit::class)
@ScriptFile("hello-world-eventhandler.kts")
class EventHandlerTest {

@Inject
lateinit var client: EventClientSync

// our test will go here ...
}

A first test

This test makes sure that the Event Handler returns an ack.

@Test
fun testHelloWorld() {
val reply = client.sendEvent(
details = Hello("PETER"),
messageType = "EVENT_HELLO_WORLD"
)

assert(reply is EventReply.EventAck)
}

As you can see here, to send an event to our event handler, we need to provide the details class. The message type is optional, but we have to set it here, as we set a custom message name on our Event Handler.

We can also provide the user name:

@Test
fun testHelloWorldWithUser() {
val reply = client.sendEvent(
details = Hello("PETER"),
messageType = "EVENT_HELLO_WORLD",
userName = "PETER"
)

assert(reply is EventReply.EventAck)
}

Different clients

There are three versions of the Event Handler client available during testing:

  • EventClientSync - this is the synchronous client, all calls are blocking
  • EventClientAsync - this is the coroutine client, where calls suspend
  • EventClientRx - this is the RxJava client, which wraps responses in a Single

In most instances, EventClientSync will suffice. Use the other clients if your tests use other asynchronous operations.

Dynamic authorization

To test dynamic authorization, add the @EnableInMemoryTestAuthCache to your class or method. This makes InMemoryTestAuthCache available for injection into your test class.

Amend the Event Handler to enable authorization:

data class Hello(
val name: String,
)

eventHandler {
eventHandler<Hello>("HELLO_WORLD_AUTH") {

permissioning {
auth("NAMES") {
authKey {
key(data.name)
}
}
}

onCommit {
ack()
}
}
}

The first thing you need to do, is to enable the in-memory test auth cache using the @EnableInMemoryTestAuthCache annotation.

You also need to inject InMemoryTestAuthCache into your test class.

@ExtendWith(GenesisJunit::class)
@ScriptFile("hello-world-eventhandler.kts")
class EventHandlerTest {

@Inject
lateinit var client: EventClientSync

@Inject
lateinit var authCache: InMemoryTestAuthCache

@Test
fun testIsAuthorised() {
authCache.authorise(
authMap = "NAMES",
entityCode = "PETER",
userName = "PETER"
)

val reply = client.sendEvent(
details = Hello(name = "PETER"),
userName = "PETER",
messageType = "EVENT_HELLO_WORLD_AUTH"
)

assert(reply is EventReply.EventAck) { reply }
}

@Test
fun testIsNotAuthorised() {
authCache.revoke(
authMap = "NAMES",
entityCode = "PETER",
userName = "PETER"
)

val reply = client.sendEvent(
details = Hello(name = "PETER"),
userName = "PETER",
messageType = "EVENT_HELLO_WORLD_AUTH"
)

assert(reply is EventReply.EventNack)
}
}

The revoke calls are not needed in this case; they have been included here to show the syntax. These calls can be used to revoke previous authorizations in your tests.

Conclusion

At this point you have tested a very simple Event Handler. You haven't even had to use the database! However, you have covered the basics of communication between tests and Event Handlers:

  • sending details to our event
  • overwriting the default message type
  • setting the user on the event
  • handling dynamic authorization

For more details about testing with Genesis, take a look at our integration test documentation.

Integration testing (legacy)

info

This section covers testing your Event Handler if you are using any version of the Genesis Server Framework before GSF v8.

The Genesis Platform provides the AbstractGenesisTestSupport abstract class that enables end-to-end testing of specific areas of your application. In this case, we want to ensure that we have a database, seeded with information, and that our Event Handler configuration is used to create our Event Handler. We also need to add the required packages, genesis home and separately set the "IS_SCRIPT" System Definition property to true (This is required as part of the Event Handler initialization).

class EventHandlerTest : AbstractGenesisTestSupport<GenesisSet>(
GenesisTestConfig {
addPackageName("global.genesis.eventhandler.pal")
genesisHome = "<genesis-home>"
parser = { it }
scriptFileName = "<app-name>-eventhandler.kts"
initialDataFile = "seed-data.csv"
}
) {
override fun systemDefinition(): Map<String, Any> = mapOf("IS_SCRIPT" to "true")
}

For more information about AbstractGenesisTestSupport, see the Testing pages.

Once you have added your config above, you can start writing tests against our Event Handler.

Writing tests

Let's write some tests for this simple Event Handler, defined below

    eventHandler<Trade>(name = "TRADE_INSERT") {
onCommit { event ->
val trade = event.details
val result = entityDb.insert(trade)
ack(listOf(mapOf("TRADE_ID" to result.record.tradeId)))
}
}
Simple test

Below is an example of a simple test.

    @Test
fun `test insert trade`(): Unit = runBlocking {
val message = Event(
details = Trade {
tradeId = "1"
counterpartyId = "CP1"
instrumentId = "I2"
side = "BUY"
price = 1.123
quantity = 1000
},
messageType = "EVENT_TRADE_INSERT"
)

val result: EventReply? = messageClient.suspendRequest(message)
result.assertedCast<EventReply.EventAck>()

val trade = entityDb.get(Trade.ById("1"))
assertNotNull(trade)
}

First, create your Event object, setting the event details and specifying the intended Event Handler for the message "EVENT_TRADE_INSERT".

We then send a message to our Event Handler using messageClient.suspendRequest(message). The result is first verified to be an EventAck. Then check that the inserted trade can be retrieved from the database.

Remember to add the runBlocking coroutine scope to the test, as the Genesis platform uses Kotlin coroutines.

Error response test

You may also want to test a negative case, where you expect to receive an error as a response.

You need to modify the previous example Event Handler and add an onValidate block:

eventHandler<Trade>(name = "TRADE_INSERT") {
onCommit { event ->
val trade = event.details
val result = entityDb.insert(trade)
ack(listOf(mapOf("TRADE_ID" to result.record.tradeId)))
}
onValidate { event ->
val message = event.details
verify {
entityDb hasEntry Counterparty.ById(message.counterpartyId)
entityDb hasEntry Instrument.ById(message.instrumentId)
}
ack()
}
}

In the example below, we expect the response to be of type EventNack, which has a property error containing a list of errors.

    @Test
fun `test invalid instrument`(): Unit = runBlocking {
val message = Event(
details = Trade {
tradeId = "1"
counterpartyId = "CP1"
instrumentId = "DOESNOTEXIST"
side = "BUY"
price = 1.213
quantity = 100
},
messageType = "EVENT_TRADE_INSERT"
)

val result: EventReply? = messageClient.suspendRequest(message)
val eventNack: EventReply.EventNack = result.assertedCast()

assertThat(eventNack.error).containsExactly(
StandardError(
"INTERNAL_ERROR",
"INSTRUMENT ById(instrumentId=DOESNOTEXIST) not found in database"
)
)
}
Testing with authorization

To test that the Event Handler authorization works correctly, you need to do some setting up.

First, make sure that your authorization set-up is designed to behave as follows:

  • A user who enters a trade must have an entry in the "ENTITY_VISIBILITY" auth map; the entity code for this user must match the counterpartyId of the trade.
  • The user must have an entry in the "RIGHT_SUMMARY" table with "RIGHT_CODE" as "TRADER".

Second, you need to modify the previous example Event Handler so that only authorized users can insert trades.

eventHandler<Trade>(name = "TRADE_INSERT") {
permissioning {
permissionCodes = listOf("TRADER")
auth(mapName = "ENTITY_VISIBILITY") {
authKey {
key(data.counterpartyId)
}
}
}
onValidate { event ->
val message = event.details
verify {
entityDb hasEntry Counterparty.ById(message.counterpartyId)
entityDb hasEntry Instrument.ById(message.instrumentId)
}
ack()
}
onCommit { event ->
val trade = event.details
val result = entityDb.insert(trade)
ack(listOf(mapOf("TRADE_ID" to result.record.tradeId)))
}
}

Third, you need to specify the auth cache override in the GenesisTestConfig:

class EventHandlerTest : AbstractGenesisTestSupport<GenesisSet>(
GenesisTestConfig {
addPackageName("global.genesis.eventhandler.pal")
genesisHome = "/GenesisHome/"
parser = { it }
scriptFileName = "your-application-eventhandler.kts"
initialDataFile = "seed-data.csv"
addAuthCacheOverride("ENTITY_VISIBILITY")
}
) {
...
}

Fourth, in your test set-up, authorize one user to be able to insert trades and another who is not.

    @Before
fun setUp() {
authorise("ENTITY_VISIBILITY", "CP1", "TraderUser")

val trader = DbRecord.dbRecord("RIGHT_SUMMARY") {
"USER_NAME" with "TraderUser"
"RIGHT_CODE" with "TRADER"
}
val support = DbRecord.dbRecord("RIGHT_SUMMARY") {
"USER_NAME" with "SupportUser"
"RIGHT_CODE" with "SUPPORT"
}
rxDb.insert(trader).blockingGet()
rxDb.insert(support).blockingGet()
}

For more information on authorization, see the authorization docs.

Below is a test that verifies only Traders can enter trades:

    @Test
fun `test trade inserted by trader`(): Unit = runBlocking {
val message = Event(
details = Trade {
tradeId = "1"
counterpartyId = "CP1"
instrumentId = "I2"
side = "BUY"
price = 5.0
quantity = 1
},
messageType = "EVENT_TRADE_INSERT",
userName = "TraderUser"
)

val result: EventReply? = messageClient.suspendRequest(message)
result.assertedCast<EventReply.EventAck>()

val trade = entityDb.get(Trade.ById("1"))
assertNotNull(trade)
}

Following that, here is a test to verify that a trade cannot be entered if the user is not a Trader:

    @Test
fun `test trade cannot be inserted if not trader`(): Unit = runBlocking {
val message = Event(
details = Trade {
tradeId = "1"
counterpartyId = "CP1"
instrumentId = "I2"
side = "BUY"
price = 5.0
quantity = 1
},
messageType = "EVENT_TRADE_INSERT",
userName = "SupportUser"
)

val result: EventReply? = messageClient.suspendRequest(message)
val eventNack = result.assertedCast<EventReply.EventNack>()

assertThat(eventNack.error).containsExactly(
StandardError(
"NOT_AUTHORISED",
"User SupportUser lacks sufficient permissions"
)
)
}

Manual testing

An API client, such as Postman or Insomnia is useful way of testing components. As a client, it is effectively a front end seeking information from the server.

The API client enables you to create calls to the resources in your server - Data Servers, Request Servers and Event Handlers. Then you can just click to run a call and see what response you get.

Before you can make any calls on these resources, you will have to permission yourself by obtaining a SESSION_AUTH_TOKEN. The details of how to do this are on our separate Testing page.

Once you have the SESSION_AUTH_TOKEN, keep a copy that you can paste into each request as you make your test call.

In the example below, we are using Insomnia as the client API. We are going to test the EVENT_COUNTERPARTY_INSERT Event Handler by adding a new counterparty.

url and Body

In front of the url, set the call to POST.

The url consists of:

  • the address or hostname of the server
  • if necessary, some extra routing; in this case sm uses a proxy to access the server
  • the name of the Event Handler

Set the body to JSON. In the body, you need to insert the details of the fields for the new counterparty, as seen below:

In the header, you need to supply:

  • a SOURCE_REF (always), which identifies you; you can use any string value that suits you
  • the SESSION_AUTH_TOKEN that permissions you to access the server

When you have all these elements in place, click on Send to make the call. If the event is a success, you will receive an ACK message.

Checking the insertion

Now you can check that the new counterparty you inserted is in the correct table of the database. The resource you need to check is the Request Server called ALL_COUNTERPARTYS.

In front of the url, set the call to POST.

The url consists of:

  • the address or hostname of the server
  • if necessary, some extra routing; in this case sm uses a proxy to access the server
  • the name of the Request Server

Set the body to JSON. There is no need for any information in the body. Simply insert a pair of curly brackets .

In the header, you need to supply:

  • a SOURCE_REF (always), which identifies you; you can use any string value that suits you
  • the SESSION_AUTH_TOKEN that permissions you to access the server

When you have this in place, click on Send to make the call. You can see that the fields for the instruments have been returned on the right of the screen.

Testing java event handlers

The Genesis Application Platform provides the AbstractGenesisTestSupport abstract class that enables end-to-end testing of specific areas of your application.

In this case, build GenesisTestConfig with the following information:

  • Set packages: global.genesis.eventhandler this is the standard package name from the framework, which is needed for all Java events/custom events. Make sure you name the package where you defined the events. In the example below, it is global.genesis.position.samples.events.rxjava
  • Set genesis home
  • Set initial data: we want to ensure that we have a database, seeded with information
public class TradingEventHandlerTest extends AbstractGenesisTestSupport<EventResponse> {
public TradingEventHandlerTest() {
super(GenesisTestConfig.builder()
.setPackageNames(List.of("global.genesis.eventhandler","global.genesis.position.samples.events.rxjava"))
.setGenesisHome("/GenesisHome/")
.setInitialDataFiles("seed-data.csv")
.setParser(EventResponse.Companion)
.build()
);
}
}

For more information about AbstractGenesisTestSupport, see the Testing pages.

Once you have set up your configuration, you can start writing tests against your Event Handler.

Writing tests

Let's write some tests for the simple Event Handler defined below:

        @Module
public class EventTrade implements Rx3ValidatingEventHandler<Trade, EventReply> {

private final RxEntityDb entityDb;

@Inject
public EventTrade(RxEntityDb entityDb) {
this.entityDb = entityDb;
}

@Nullable
@Override
public String messageType() {
return "TRADE_INSERT";
}

@NotNull
@Override
public Single<EventReply> onCommit(@NotNull Event<Trade> tradeEvent) {
Trade trade = tradeEvent.getDetails();
return entityDb.writeTransaction(txn -> {
Trade result = txn.insert(trade).blockingGet().getRecord();
return ack(this, List.of(Map.of("TRADE_ID", result.getTradeId())));
}).map(result -> result.getFirst());
}

@NotNull
@Override
public Single<EventReply> onValidate(@NotNull Event<Trade> event) {
Trade trade = event.getDetails();
if (entityDb.get(Counterparty.byId(trade.getCounterpartyId())).blockingGet() == null) {
return Single.just(new StandardError("INTERNAL_ERROR", "COUNTERPARTY ById(counterpartyId=" + trade.getCounterpartyId() +") not found in database").toEventNackError());
} else if (entityDb.get(Instrument.byId(trade.getInstrumentId())).blockingGet() == null) {
return Single.just(new StandardError("INTERNAL_ERROR", "INSTRUMENT ById(instrumentId=" + trade.getInstrumentId() +") not found in database").toEventNackError());
}
return ack(this);
}
}
Simple test

Below is an example of a simple test.

First, this creates an Event object, setting the event details and specifying the intended Event Handler for the message "EVENT_TRADE_INSERT" and username.

Second, it sends a message to the Event Handler using getMessageClient().request(event, EventReply.class). The result is first verified to be an EventAck.

Finally, it checks that the inserted trade can be retrieved from the database.:

    @Test
public void testTradeInsert() throws InterruptedException {
Trade trade = Trade.builder()
.setTradeId("1")
.setCounterpartyId("CP1")
.setInstrumentId("I2")
.setSide("BUY")
.setPrice(1.123)
.setQuantity(1000)
.build();
Event event = new Event(trade, "EVENT_TRADE_INSERT", "JohnDoe");
EventReply reply = getMessageClient().request(event, EventReply.class).blockingGet();
assertEquals(reply, new EventReply.EventAck(List.of(Map.of("TRADE_ID", trade.getTradeId()))));
Trade result = getRxDb().entityDb().get(Trade.byId("1")).blockingGet();
assertNotNull(result);
}
Error response test

You may also want to test a negative case, where you expect to receive an error as a response.

In the example below, we expect the response to be of type EventNack when we try to insert a wrong instrument ID. As in the Event Handler above, there is a check to see if the instrument exists in the database.

    @Test
public void testTradeInsertWrongInstrumentId() throws InterruptedException {
Trade trade = Trade.builder()
.setTradeId("1")
.setCounterpartyId("CP1")
.setInstrumentId("DOESNOTEXIST")
.setSide("BUY")
.setPrice(1.213)
.setQuantity(100)
.build();
Event event = new Event(trade, "EVENT_TRADE_INSERT_JAVA", "JohnDoe");
EventReply reply = getMessageClient().request(event, EventReply.class).blockingGet();
GenesisError genesisError = new StandardError("INTERNAL_ERROR", "INSTRUMENT ById(instrumentId=DOESNOTEXIST) not found in database");
assertEquals(reply, new EventReply.EventNack(List.of(), List.of(genesisError)));
}
Testing with authorization
Set-up

To test that the Event Handler authorization works correctly, you need to do some setting up.

First, make sure that your authorization set-up is designed to behave as follows:

  • A user who enters a trade must have an entry in the "ENTITY_VISIBILITY" auth map; the entity code for this user must match the counterpartyId of the trade.
  • The user must have an entry in the "RIGHT_SUMMARY" table with "RIGHT_CODE" as "TRADER".

Second, you need to modify the previous example Event Handler so that only authorized users can insert trades.

    @Module
public class EventTrade implements Rx3EventHandler<Trade, EventReply> {

private final RxEntityDb entityDb;
private final RxDb rxDb;
private final RightSummaryCache rightSummaryCache;

private Authority authCache;

@Inject
public EventTrade(RxEntityDb entityDb, RxDb rxDb, RightSummaryCache rightSummaryCache) {
this.entityDb = entityDb;
this.rightSummaryCache = rightSummaryCache;
this.rxDb = rxDb;
}

@Inject
public void init() {
this.authCache = AuthCache.newReader("ENTITY_VISIBILITY", rxDb.getUpdateQueue());
}

@Nullable
@Override
public String messageType() {
return "TRADE_INSERT_JAVA";
}

@Override
public Single<EventReply> process(Event<Trade> tradeEvent) {
String userName = tradeEvent.getUserName();

if(rightSummaryCache.userHasRight(userName, "TRADER")){
Trade trade = tradeEvent.getDetails();
return entityDb.writeTransaction(txn -> {
Trade result = txn.insert(trade).blockingGet().getRecord();
return ack(this, List.of(Map.of("TRADE_ID", result.getTradeId())));
}).map(result -> result.getFirst());
}
return Single.just(new StandardError("NOT_AUTHORIZED", "User " + userName + " lacks sufficient permissions").toEventNackError());
}
}

Third, you need to specify the auth cache override in the GenesisTestConfig:

    public class TradingEventHandlerTest extends AbstractGenesisTestSupport<EventResponse> {
public TradingEventHandlerTest() {
super(GenesisTestConfig.builder()
.setPackageNames(List.of("global.genesis.eventhandler","global.genesis.rxjava"))
.setGenesisHome("/GenesisHome/")
.setInitialDataFiles("TEST_DATA.csv")
.setAuthCacheOverride(List.of("ENTITY_VISIBILITY"))
.setParser(EventResponse.Companion)
.build()
);
}
}

Fourth, in your test set-up, let's authorise one user to be able to insert trades and another who is not.

    @Before
public void setUp() {
authorise("ENTITY_VISIBILITY", "CP1", "TraderUser");
getRxDb().insert(RightSummary.builder().setRightCode("TRADER").setUserName("TraderUser").build().toDbRecord()).blockingGet();
getRxDb().insert(RightSummary.builder().setRightCode("SUPPORT").setUserName("SupportUser").build().toDbRecord()).blockingGet();
}

For more information on authorisation, see the authorization docs.

Tests

After you have set things up. Now you can create the tests themselves.

Below is a test that verifies that only Traders can enter trades:

    @Test
public void testTradeInsertedByTrader() {
Trade trade = Trade.builder()
.setTradeId("1")
.setCounterpartyId("CP1")
.setInstrumentId("I2")
.setSide("BUY")
.setPrice(5.0)
.setQuantity(1)
.build();
Event event = new Event(trade, "EVENT_TRADE_INSERT", "TraderUser");
EventReply reply = getMessageClient().request(event, EventReply.class).blockingGet();
assertEquals(reply, new EventReply.EventAck(List.of(Map.of("TRADE_ID", trade.getTradeId()))));

Trade insertedUser = getRxDb().entityDb().get(Trade.byId("1")).blockingGet();
assertNotNull(insertedUser);
}

Following that, we have a test to verify that a trade cannot be entered if the user is not a Trader:

    @Test
public void testTradeCannotBeInsertedIfNotTrader() {
Trade trade = Trade.builder()
.setTradeId("1")
.setCounterpartyId("CP1")
.setInstrumentId("I2")
.setSide("BUY")
.setPrice(5.0)
.setQuantity(1)
.build();
Event event = new Event(trade, "EVENT_TRADE_INSERT_JAVA", "SupportUser");
EventReply reply = getMessageClient().request(event, EventReply.class).blockingGet();

GenesisError genesisError = new StandardError("NOT_AUTHORIZED", "User SupportUser lacks sufficient permissions");
assertEquals(reply, new EventReply.EventNack(List.of(), List.of(genesisError)));
}

Event handler API

GPAL event handlers are powerful and the recommended approach. There is an API available to write event handlers in Java, or Kotlin. The following section gives details of this API

In most cases, you will create Event handlers in GPAL as recommended. This offers a method with succinct code and a good degree of flexibility.

However, you can also implement Event Handlers as a set of classes. Typically, this is useful where you have a complex requirement for business logic and database interaction. For example, a kts file of 1,000 lines is difficult to test and maintain; in this case, a set of individual classes is much more convenient.

For implementing an Event Handler as a set of classes, there are three different options:

  • Async. This uses the Kotlin coroutine API to simplify asynchronous development. This is the underlying implementation used in GPAL Event Handlers. You can only create Async Event Handlers using Kotlin.
  • RxJava3. This uses the RxJava3 library, which is a popular option for composing asynchronous event-based programs. You can create RxJava3 Event Handlers using either Kotlin or Java.
  • Sync. This creates synchronous Event Handlers. You can create Sync Event Handlers using either Kotlin or Java.
note

Java Event Handlers can be implemented using RxJava3 and Sync Event Handlers only. Async Event Handlers cannot be used, as there is no implementation for Kotlin coroutines in Java.

We recommend using Kotlin to implement Event Handlers.

Configure in processes.xml file

You need to add the global.genesis.eventhandler package to the package tag of the process; this tag defines which package the process should refer to. For example:

<process name="POSITION_NEW_PROCESS">    
<groupId>POSITION</groupId>
<start>true</start>
<options>-Xmx256m -DRedirectStreamsToLog=true -DXSD_VALIDATE=false</options>
<module>position-new-process</module>
<package>global.genesis.eventhandler,position.company.manager</package>
<description>Handles events</description>
</process>

Event Handler interface

The Event Handler interface is the common supertype of AsyncEventHandler, Rx3EventHandler and SyncEventHandler, but it is not meant to be used on its own. It provides basic options for each Event Handler definition, which can be overridden. See the Kotlin methods explanation below:

NameSignatureDefault valueDescription
excludeMetadataFieldsfun excludeMetadataFields(): Set<String>setOf("RECORD_ID", "TIMESTAMP")Contains a list of metadata fields to be excluded from the event metadata extracted from the input I
includeMetadataFieldsfun includeMetadataFields(): Set<String>emptySet()Contains a list of metadata fields that need to be included in the event metadata; this must be available in input I. A non-empty list will exclude the other fields.
messageTypefun messageType(): String?nullContains the name of the Event Handler. If undefined, the Event Handler name will become EVENT_*INPUT_CLASS_NAME*. So, for an Event Handler using an input type called TradeInsert, the message type will become EVENT_TRADE_INSERT.
overrideMetadataFieldsfun overrideMetadataFields(): Map<String, OverrideMetaField>emptySet()Contains a map (key-value entries) of metadata field names to metadata field definitions in the shape of OverrideMetaField. This enables you to override the metadata field properties extracted from input I
requiresPendingApprovalfun requiresPendingApproval(): BooleanfalseThis is used where particular system events require a second system user to approve them (pending approval in order to take effect)
schemaValidationfun schemaValidation(): BooleantrueThis option enables you to disable the automatic Json Schema validation enforced by the back end. See type-safe messages for more information.

Each custom Event Handler must define an input message type I and an output message type O (these need to be data classes), as GPAL Event Handlers do). In the examples below, Company is the input message and EventReply is the output message. The message object contains event message and has the following properties :

NameDefault valueDescription
detailsThis has input information, example: Company
messageTypeName of the Event Handler
userNameName of logged-in user
ignoreWarningsfalseIf set to false, events will not be processed if there are any warnings; you will get EventNack with warning message. If set to true, warning messages will be ignored; processing of events will be stopped only if there are any errors
requiresApprovalfalseThis particular event needs approval from a second user if set to true. For more details, see approval workflow
approvalKeynullAuto-generated key ID for particular approval request. For more details, see approval workflow
approvalMessagenullOptional message for approval request. For more details, see approval workflow
reasonnullOptional reason sent as part of event message

Async

AsyncEventHandler

This is the most basic definition of an Async Event Handler. You can define an AsyncEventHandler by implementing the AsyncEventHandler interface, which is defined as: interface AsyncEventHandler<I : Any, O : Outbound> : AsyncEventWorkflowProcessor<I, O>, EventHandler

The only mandatory method to implement this in the interface is:

NameSignature
processfun suspend process(message: Event<I>) : O

This method passes the input message type I as a parameter and expects the output message type O to be returned.

Here is an example:

import com.google.inject.Inject
import global.genesis.commons.annotation.Module
import global.genesis.db.rx.entity.multi.AsyncEntityDb
import global.genesis.eventhandler.typed.async.AsyncEventHandler
import global.genesis.gen.dao.Company
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply

@Module
class EventCompanyHandlerAsync @Inject constructor(
private val entityDb: AsyncEntityDb,
private val companyService: CompanyService
) : AsyncEventHandler<Company, EventReply> {
override suspend fun process(message: Event<Company>): EventReply {
val company = message.details
// custom code block..
return EventReply.EventAck()
}
}

The methods below are provided as part of AsyncEventHandler; they provide an easy way of creating EventReply responses.

NameSignature
ackfun <I : Any> AsyncEventHandler<I, EventReply>.ack(): EventReply
ackfun <I : Any> AsyncEventHandler<I, EventReply>.ack(generated: List<Map<String, Any>> = emptyList()): EventReply
nackfun <I : Any> AsyncEventHandler<I, EventReply>.nack(throwable: Throwable): EventReply
nackfun <I : Any> AsyncEventHandler<I, EventReply>.nack(error: String): EventReply

Using these helper methods, you could simplify the previous implementation like this:

import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.async.AsyncEventHandler
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply

@Module
class EventCompanyHandlerAsync : AsyncEventHandler<Company, EventReply> {
override suspend fun process(message: Event<Company>): EventReply {
val company = message.details
// custom code block..
return ack()
}
}

AsyncValidatingEventHandler

In the previous example, there was no distinction between validation and commit blocks, which is possible in GPAL Event Handlers. In order to have a better separation of concerns using custom Event Handlers, you can implement the AsyncValidatingEventHandler interface, which is defined as:

interface AsyncValidatingEventHandler<I : Any, O : Outbound> : AsyncEventHandler<I, O>

Implementation

Using this interface, you do not need to override the process method; you can split your logic into validation and commit stages. There are various methods of implementing this, which are described below:

NameSignature
onValidatesuspend fun onValidate(message: Event<I>): O
onCommitsuspend fun onCommit(message: Event<I>): O

Here is an example:

import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.async.AsyncValidatingEventHandler
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply

@Module
class TestCompanyHandlerAsync : AsyncValidatingEventHandler<Company, EventReply> {
override suspend fun onValidate(message: Event<Company>): EventReply {
val company = message.details
// custom code block..
return ack()
}

override suspend fun onCommit(message: Event<Company>): EventReply {
val company = message.details
// custom code block..
return ack()
}
}

If the validate flag is received as true, only the onValidate code block will be executed. If the validate flag is received as false, both the onValidate and onCommit blocks will be executed.

AsyncContextValidatingEventHandler

In some cases, you might want to carry information from the onValidate code block to the onCommit code block for efficiency purposes. (For example, if several database look-ups happen in onValidate and you want to reuse that information.) Using the AsyncContextValidatingEventHandler interface, you can provide this context information from the validation stage to the commit stage. See the interface below: interface AsyncContextValidatingEventHandler<I : Any, O : Outbound, C : Any> : AsyncEventHandler<I, O>

Implementation

As with the previous example, when using this interface, you do not need to override the process method. The different methods for implementing this are described below:

NameSignature
onValidatesuspend fun onValidate(message: Event<I>): ValidationResult<O, C>
onCommitsuspend fun onCommit(message: Event<I>, context: C?): O

The validationResult methods are provided to help with the context creation:

NameSignature
validationResultfun validationResult(result: O): ValidationResult<O, C>
validationResultfun validationResult(result: O, context: C): ValidationResult<O, C>

The type C represents the contextual information we want to provide, and it can be any Java/Kotlin type. Here is an example:

import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.async.AsyncContextValidatingEventHandler
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply
import global.genesis.message.core.event.ValidationResult

@Module
class TestCompanyHandlerAsync : AsyncContextValidatingEventHandler<Company, EventReply, String> {
override suspend fun onValidate(message: Event<Company>): ValidationResult<EventReply, String> {
val company = message.details
// custom code block..
val companyName = company.companyName
return validationResult(ack(), companyName)
}

override suspend fun onCommit(message: Event<Company>, context: String?): EventReply {
if(context != null) {
// Do something with the context
}
val company = message.details
// custom code block..
return ack()
}
}

Rx3

The mechanism explained in Async can be recycled and reapplied in Rx3 Event Handlers.

Rx3EventHandler

In a similar fashion to AsyncEventHandler, there is an Rx3 implementation flavour. It works in a very similar way to AsyncEventHandler, but requires different return types (i.e. we expect to return RxJava3 Single<O> type, instead of just the O type).

See the interface definition below: interface Rx3EventHandler<I : Any, O : Outbound> : Rx3EventWorkflowProcessor<I, O>, EventHandler

Implementation

The mandatory method for implementing this is:

NameSignature
processfun process(message: Event<I>) : Single<O>
Helper methods
NameSignature
ackfun <I : Any> Rx3EventHandler<I, EventReply>.ack(): Single<EventReply>
ackfun <I : Any> Rx3EventHandler<I, EventReply>.ack(generated: List<Map<String, Any>> = emptyList()): Single<EventReply>
nackfun <I : Any> Rx3EventHandler<I, EventReply>.nack(throwable: Throwable): Single<EventReply>
nackfun <I : Any> Rx3EventHandler<I, EventReply>.nack(error: String): Single<EventReply>

Here is an example:

    import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.rx3.Rx3EventHandler
import global.genesis.gen.dao.Company
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply
import io.reactivex.rxjava3.core.Single

@Module
class TestCompanyHandlerRx3 : Rx3EventHandler<Company, EventReply> {
override fun process(message: Event<Company>): Single<EventReply> {
return ack()
}
}

Rx3ValidatingEventHandler

The same applies to an Rx3ValidatingEventHandler. It is similar to AsyncValidatingEventHandler in every way, but the return type is still Single<O>.

interface Rx3ValidatingEventHandler<I : Any, O : Outbound> : Rx3EventHandler<I, O>

Implementation
NameSignature
onValidatefun onValidate(message: Event<I>): Single<O>
onCommitfun onCommit(message: Event<I>): Single<O>

Here is an example:

    import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.rx3.Rx3ValidatingEventHandler
import global.genesis.gen.dao.Company
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply
import io.reactivex.rxjava3.core.Single

@Module
class TestCompanyHandlerRx3 : Rx3ValidatingEventHandler<Company, EventReply> {
override fun onValidate(message: Event<Company>): Single<EventReply> {
val company = message.details
// custom code block..
return ack()
}

override fun onCommit(message: Event<Company>): Single<EventReply> {
val company = message.details
// custom code block..
return ack()
}
}

Rx3ContextValidatingEventHandler

And the same goes for Rx3ContextValidatingEventHandler in relation to AsyncContextValidatingEventHandler.

interface Rx3ContextValidatingEventHandler<I : Any, O : Outbound, C : Any> : Rx3EventHandler<I, O>

Implementation
NameSignature
onValidatefun onValidate(message: Event<I>): Single<ValidationResult<O, C>>
onCommitfun onCommit(message: Event<I>, context: C?): Single<O>
Helper methods
NameSignature
validationResultfun validationResult(result: O): ValidationResult<O, C>
validationResultfun validationResult(result: O, context: C): ValidationResult<O, C>

Here is an example:

    import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.rx3.Rx3ContextValidatingEventHandler
import global.genesis.gen.dao.Company
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply
import global.genesis.message.core.event.ValidationResult
import io.reactivex.rxjava3.core.Single

@Module
class TestCompanyHandlerRx3 : Rx3ContextValidatingEventHandler<Company, EventReply, String> {
override fun onValidate(message: Event<Company>): Single<ValidationResult<EventReply, String>> {
val company = message.details
// custom code block..
val companyName = company.companyName
return Single.just(validationResult(EventReply.EventAck(), companyName))
}

override fun onCommit(message: Event<Company>, context: String?): Single<EventReply> {
if (context != null) {
// Do something with the context
}
val company = message.details
// custom code block..
return ack()
}
}

Sync

Sync works similarly to Async and Rx3, but in this case, there is no Single<O> returned and no suspend modifier used for Kotlin coroutines. The expected output of the Event Handler logic is just the O type.

SyncEventHandler

interface SyncEventHandler<I : Any, O : Outbound> : SyncEventWorkflowProcessor<I, O>, EventHandler

Implementation
NameSignature
processfun process(message: Event<I>) : O
Helper methods
NameSignature
ackfun <I : Any> SyncEventHandler<I, EventReply>.ack(): EventReply
ackfun <I : Any> SyncEventHandler<I, EventReply>.ack(generated: List<Map<String, Any>> = emptyList()): EventReply
nackfun <I : Any> SyncEventHandler<I, EventReply>.nack(throwable: Throwable): EventReply
nackfun <I : Any> SyncEventHandler<I, EventReply>.nack(error: String): EventReply

Here is an example:

    import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.sync.SyncEventHandler
import global.genesis.gen.dao.Company
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply

@Module
class TestCompanyHandlerSync : SyncEventHandler<Company, EventReply> {
override fun process(message: Event<Company>): EventReply {
return ack()
}
}

SyncValidatingEventHandler

interface SyncValidatingEventHandler<I : Any, O : Outbound> : SyncEventHandler<I, O>

Implementation
NameSignature
onValidatefun onValidate(message: Event<I>): O
onCommitfun onCommit(message: Event<I>): O

Here is an example:

    import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.sync.SyncValidatingEventHandler
import global.genesis.gen.dao.Company
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply

@Module
class TestCompanyHandlerSync : SyncValidatingEventHandler<Company, EventReply> {
override fun onValidate(message: Event<Company>): EventReply {
val company = message.details
return ack()
}

override fun onCommit(message: Event<Company>): EventReply {
val company = message.details
return ack()
}
}

SyncContextValidatingEventHandler

interface SyncContextValidatingEventHandler<I : Any, O : Outbound, C : Any> : SyncEventHandler<I, O>

Implementation
NameSignature
onValidatefun onValidate(message: Event<I>): ValidationResult<O, C>
onCommitfun onCommit(message: Event<I>, context: C?): O
Helper methods
NameSignature
validationResultfun validationResult(result: O): ValidationResult<O, C>
validationResultfun validationResult(result: O, context: C): ValidationResult<O, C>

Here is an example:

    import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.sync.SyncContextValidatingEventHandler
import global.genesis.gen.dao.Company
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply
import global.genesis.message.core.event.ValidationResult

@Module
class TestCompanyHandlerSync : SyncContextValidatingEventHandler<Company, EventReply, String> {
override fun onValidate(message: Event<Company>): ValidationResult<EventReply, String> {
val company = message.details
// custom code block..
val companyName = company.companyName
return validationResult(ack(), companyName)
}

override fun onCommit(message: Event<Company>, context: String?): EventReply {
if (context != null) {
// Do something with the context
}
val company = message.details
// custom code block..
return ack()
}
}

Java event handlers

Event Handlers can be written in Java using Event Handler APIs. On this page, we look at Event Handlers written using the Rx3 Event handlers

note

We recommend using Kotlin to implement Event Handlers.

  • Java Event Handlers can only be implemented using RxJava3 or Sync Event Handlers.
  • Async Event Handlers are widely used in Kotlin events and cannot be used for Java events, as there is no implementation for Kotlin coroutines in Java.

To work with a Java Event Handler:

  1. In your application-eventhandler/src/main/ folder, create an empty folder called java. This ensures that the Java file will be compiled.
  2. In your application-eventhandler folder, create a package for your Java Event Handler. Make sure that your package name is specific - this will be used to start the Event Handler process.

  1. You can then create the file for the Java Event Handler in this folder.
  2. Update your processes.xml file to include the details of the new module. For example:
<process name="JAVA_EVENT_HANDLER">
<groupId>MYAPP</groupId>
<start>true</start>
<options>-Xmx256m -DRedirectStreamsToLog=true -DXSD_VALIDATE=false</options>
<module>myapp-eventhandler</module>
<package>global.genesis.eventhandler,genesis.global.myapp.trades</package>
<description>Handles events</description>
</process>

You can use both Java and Kotlin Event Handlers in the same process.

Make sure that both types of Event Handler are included in your application's processes.xml file. In the example below:

  • The Kotlin Event Handler is <module>genesis-pal-eventhandler</module>.
  • The <script> and <language> for the Kotlin file are registered.
  • The Kotlin and Java Event Handlers are both registered in the <package>. (In this case, the Java files are located in the package genesis.global.trades.)
  • The jar files for the Java Event Handler are registered in the <classpath>.
<process name="JAVA_KOTLIN_EVENT_HANDLERS">
<groupId>MYAPP</groupId>
<start>true</start>
<options>-Xmx256m -DRedirectStreamsToLog=true -DXSD_VALIDATE=false</options>
<module>genesis-pal-eventhandler</module>
<package>global.genesis.eventhandler.pal,genesis.global.trades</package>
<script>myapp-eventhandler.kts</script>
<language>pal</language>
<classpath>myapp-eventhandler*.jar</classpath>
<description>Handles events</description>
</process>

Example

This method passes the input message type CounterParty as a parameter and expects the output message type EventReply to be returned.

The default name will be EVENT_<input message type name>. So, for an input message type declared as CounterParty, the event with the name EVENT_COUNTERPARTY is registered automatically.

        @Module
public class EventCounterParty implements Rx3EventHandler<Counterparty, EventReply> {

private final RxEntityDb entityDb;

@Inject
public EventCounterParty(RxEntityDb entityDb) {
this.entityDb = entityDb;
}

@Override
public Single<EventReply> process(Event<Counterparty> counterpartyEvent) {
Counterparty counterparty = counterpartyEvent.getDetails();
return entityDb.insert(counterparty).flatMap(result -> ack(this));
}
}
info

Any Java Event Handler classes you create must be placed in the same folder as the Java Event Handler module itself.

Adding a name

Every eventHandler must have a unique name. If you do not provide one, it will be allocated a default name automatically, as shown in the previous example.

It is good practice to provide your own name for each eventHandler. For example, if you have insert and modify codeblocks for the same table and you don't name them, then the platform will probably generate identical names for both - which will give you a problem. Note that the prefix EVENT_ is automatically added to the name that you specify.

So, below, we modify our previous example by defining the name of the eventHandler: COUNTERPARTY_INSERT by overriding messageType method of the Rx3EventHandler. This will automatically become EVENT_COUNTERPARTY_INSERT.


@Module
public class EventCounterParty implements Rx3EventHandler<Counterparty, EventReply> {

private final RxEntityDb entityDb;

@Inject
public EventCounterParty(RxEntityDb entityDb) {
this.entityDb = entityDb;
}

@Nullable
@Override
public String messageType() {
return "COUNTERPARTY_INSERT";
}

@Override
public Single<EventReply> process(Event<Counterparty> counterpartyEvent) {
Counterparty counterparty = counterpartyEvent.getDetails();
return entityDb.insert(counterparty).flatMap(result -> ack(this));
}
}

Adding validation

So far, we have overridden the process method in our eventHandler. This is where the active instructions are - usually database changes.

If you want to provide some validation before the action, you need to implement the Rx3ValidatingEventHandler interface and override onCommit and onValidate methods.

in the example below, the onValidate method will be executed first and the onCommit method will only be executed if the counterparty field is not null.


@Module
public class EventCounterParty implements Rx3ValidatingEventHandler<Counterparty, EventReply> {

private final RxEntityDb entityDb;

@Inject
public EventCounterParty(RxEntityDb entityDb) {
this.entityDb = entityDb;
}

@Nullable
@Override
public String messageType() {
return "COUNTERPARTY_INSERT";
}

@NotNull
@Override
public Single<EventReply> onCommit(@NotNull Event<Counterparty> event) {
Counterparty counterparty = event.getDetails();
return entityDb.insert(counterparty) .flatMap(result -> ack(this));
}

@NotNull
@Override
public Single<EventReply> onValidate(@NotNull Event<Counterparty> event) {
Counterparty counterparty = event.getDetails();
if(counterparty.getName().isEmpty()) {
return nack(this, "Counterparty should have a name");
}
return ack(this);
}
}

Returning a nack

The onValidate method must always return either an ack() or nack(...).

Look at the onValidate method in the previous example:

  • if the counterparty field is empty, the eventHandler returns a nack, along with a suitable message.
  • if the counterparty field has content, then the eventHandler returns an ack

Default reply types

So far, we have seen ack and nack. There is a third reply type: warningNack`. Let's stop and look at the specifications for all three default reply types:

  • ack: used to signify a successful result. ack takes an optional parameter of List<Map<String, Any>>. For example, ack(listOf(mapOf("TRADE_ID", "1"))).
  • nack: used to signify an unsuccessful result. nack accepts either a String parameter or a Throwable. For example, nack("Error!") or nack(myThrowable).
  • warningNack: used to warn the client. warningNack, like nack, accepts either a String parameter or a Throwable. For example, warningNack("Provided User alias $userAlias will override Username $username.") or warningNack(myThrowable).

Transactional Event Handlers (ACID)

If you want your eventHandler to comply with ACID, you need to use the RxEntityDb writeTransaction. Any exception or nack returned will result in a complete rollback of all parts of the onCommit and onValidate (the transaction also covers read commands) methods.


@Module
public class EventCounterParty implements Rx3EventHandler<Counterparty, EventReply> {

private final RxEntityDb entityDb;@Inject

public EventCounterParty(RxEntityDb entityDb) {
this.entityDb = entityDb;
}

@Nullable
@Override
public String messageType() {
return "COUNTERPARTY_INSERT";
}

@Override
public Single<EventReply> process(Event<Counterparty> counterpartyEvent) {
Counterparty counterparty = counterpartyEvent.getDetails();
return entityDb.writeTransaction(txn -> {
txn.insert(counterparty);
return ack(this);
}).map(result -> result.getFirst());
}
}

Context Event Handlers

In order to optimize database look-up operations, you might want to use data obtained by the onValidate method inside your onCommit method. To do this, implement the Rx3ContextValidatingEventHandler interface, as shown below:


@Module
public class ContextEvent implements Rx3ContextValidatingEventHandler<Company, EventReply, String> {
private final RxEntityDb entityDb;

@Inject
public ContextEvent(RxEntityDb entityDb) {
this.entityDb = entityDb;
}

@Nullable
@Override
public String messageType() {
return "CONTEXT_COMPANY_INSERT";
}

@NotNull
@Override
public Single<EventReply> onCommit(@NotNull Event<Company> event, @Nullable String context) {
String parsedContext;
parsedContext = Objects.requireNonNullElse(context, "Missing context");
Company company = event.getDetails();
return entityDb.insert(company).flatMap(result -> ack(this, List.of(Map.of("VALUE",parsedContext))));
}

@NotNull
@Override
public Single<ValidationResult<EventReply, String>> onValidate(@NotNull Event<Company> event) {
Company company = event.getDetails();
if(company.getCompanyName().equals("MY_COMPANY")) {
return ack(this).map(result -> validationResult(result, "Best company in the world"));
} else {
return ack(this).map(this::validationResult);
}
}
}