Core business logic (Event Handler)
Overview
The Genesis Application Platform has a real-time event-driven architecture. The Event Handler is a microservice responsible for providing these events, which can apply business logic and affect the application's data set. You can configure events, including optional and mandatory input fields, which the client can trigger via APIs.
Nearly every table defined in your application's data model in an app will need events to affect the data. The standard events for most tables are Insert, Modify and Delete (but not every table will need all these events); depending on your application workflow, you could need many more.
You have to specify the business logic to process each of these events. You do this in the *-eventhandler.kts
files.
All the events that you specify are available via REST automatically, including Open API.
Example configuration
eventHandler {
// You can inject other kotlin or java libraries for use in event handlers
val pricingLibrary = inject<PricingLibrary>()
eventHandler<Counterparty>("COUNTERPARTY_INSERT", transactional = true) {
permissioning {
permissionCodes = listOf("CounterpartyUpdate")
}
onCommit { event ->
val details = event.details
val insertedRow = entityDb.insert(details)
ack(listOf(mapOf(
"COUNTERPARTY_ID" to insertedRow.record.counterpartyId,
)))
}
}
eventHandler<Counterparty>("COUNTERPARTY_MODIFY", transactional = true) {
permissioning {
permissionCodes = listOf("CounterpartyUpdate")
}
onCommit { event ->
val details = event.details
entityDb.modify(details)
ack()
}
}
eventHandler<Counterparty.ById>("COUNTERPARTY_DELETE", transactional = true) {
permissioning {
permissionCodes = listOf("CounterpartyUpdate")
}
onCommit { event ->
val details = event.details
entityDb.delete(details)
ack()
}
}
eventHandler<Trade>("TRADE_INSERT", transactional = true) {
permissioning {
permissionCodes = listOf("TradeInsert")
//Check user has permission to trade the submitted counterparty
auth(mapName = "ENTITY_VISIBILITY") {
authKey {
key(data.counterpartyId)
}
}
}
onValidate {
//Where the eventHandler type is a DAO object, details map to a conforming trade dao object
val trade = event.details
//Verify related entities on this trade exist in the DB
verify {
entityDb hasEntry Instrument.ById(trade.instrumentId)
entityDb hasEntry Counterparty.ById(trade.counterpartyId)
}
val price = trade.price
//Validate the price is not negative
require(price >= 0.0) { "Price cannot be negative" }
//Warn if the price is far from market price utilizing pricing library function
if(pricingLibrary.priceFarFromCurrentMarketPrice(trade.price, trade.direction)) {
return@onValidate warningNack("Price out of bounds from market price")
}
ack()
}
onCommit { event ->
val trade = event.details
val result = entityDb.insert(trade)
ack(listOf(mapOf(
"TRADE_ID" to result.record.tradeId,
)))
}
}
}
Configuration options
All eventHandler
blocks should be placed under a single outer eventHandler
block per the example configuration above.
It is important to understand the concepts of validation vs commit, and ack vs nack.
There are two main blocks to an eventHandler, onValidate
and onCommit
. The following images show the execution workflow where validate is set by the client to true vs false, and how nack (unsuccessful event) terminates the event unsuccessfully and will stop further execution of the event:
ack
An ack
needs to be supplied at the end of any onValidate
and onCommit
blocks; It signifies that the event was successful.
eventHandler<Trade>(name = "TRADE_INSERT") {
onValidate { event ->
...
ack()
}
onCommit { event ->
...
ack()
}
}
If VALIDATE
is set to true on the inbound event message, and an ack
is reached in the onValidate
block, a MESSAGE_TYPE
of EVENT_ACK
is returned to the client and the event will stop execution.
A MESSAGE_TYPE
of EVENT_ACK
will be returned to the client.
A successful event can also send data back to the client as part of an ack
. A very common use case is to return the ID(s) of newly inserted records so that the client is aware and can handle these appropriately; where the client is a UI, the IDs can be displayed to the user; if the client is a system, it may want to store the ID in its own records.
eventHandler<Trade>("TRADE_INSERT") {
onCommit { event ->
val trade = event.details
val result = entityDb.insert(trade)
ack(listOf(mapOf("TRADE_ID" to result.record.tradeId)))
}
}
nack
A nack statement signifies that an event failed. This is typically sent during onValidate
and sometimes during onEvent
.
It can take a string argument, which is the error message passed back to the client.
Where triggering within an onValidate
block, prefix the nack
with return@onValidate
to terminate validation at this point.
To trigger a nack within an onCommit
block, prefix the nack
with return @onCommit
.
eventHandler<Trade>(name = "TRADE_INSERT") {
onValidate { event ->
val price = event.details.price
if (price < 0.0) {
return@onValidate nack("Price cannot be negative")
}
...
}
onCommit { event ->
val marketPrice = pricingLibrary.getCurrentBid(event.details.instrumentId)
if (marketPrice == null) {
return@onCommit nack("Unable to determine market price for instrument ${event.details.instrumentId}")
}
...
}
}
A nack
can also take an object of type Throwable
, in which case the message
of the Throwable will be sent back to the client.
Where a nack is triggered, MESSAGE_TYPE
returned to the client is an EVENT_NACK
.
eventHandler
eventHandler
defines a new event.
In the parenthesis you can define:
name
: the name of the event.name
will automatically be prefixed withEVENT_
. soname = "TRADE_INSERT"
will be targeted by the client asEVENT_TRADE_INSERT
. Where left unspecified the name will default to the eventHandler Type class name.transactional
: takes a boolean. Set totrue
where you want youreventHandler
to comply with ACID. Any exception returned will result in a complete rollback of any database writes. While an exception will trigger a rollback, the transaction will commit if anack
orack
is returned. This setting also enables auto-auditing for writes to any audited tables.
eventHandler<Trade>(name = "TRADE_INSERT", transactional = true) {
onCommit { event ->
val trade = event.details
entityDb.insert(trade)
ack()
}
}
The <Type>
(aka Generic) parameter is used to set the event metadata. This metadata defines the event inputs the client can send in, and a certain amount of pre-validation will be performed on any inbound event messages based on this metadata (unless schemaValidation
is turned off).
You can use your table entity DAO as the <Type>
. In this case, the event will inherit the table's fields, their types and their nullability (which determines whether the field is optional or mandatory).
eventHandler<Trade>(name = "TRADE_INSERT") {
...
onCommit { event ->
val trade = event.details
entityDb.insert(trade)
ack()
}
}
Otherwise, you can create a kotlin data class
to define your metadata and use it.
You can also use an index for an event type, this is very common for DELETE events, where the client simply needs to send the ID of the record to delete:
eventHandler<Trade.ById>(name = "TRADE_DELETE") {
...
onCommit { event ->
val trade = event.details
entityDb.delete(trade)
ack()
}
}
schemaValidation
schemaValidation
takes a boolean. Where set to false, it disables JSON Schema validation enforced for type-safe messages for this event.
schemaValidation
defaults to true
when undefined.
eventHandler<Trade>(name = "TRADE_INSERT", transactional = true) {
schemaValidation = false
...
}
excludeMetadataFields
excludeMetadataFields
takes a set of field names, which are defined as part of the event meta (e.g Trade
in our example below), to exclude them from event metadata.
eventHandler<Trade>(name = "TRADE_INSERT", transactional = true) {
excludeMetadataFields = setOf("DATE")
...
}
In this example, DATE
defined on Trade
will not be part of the meta for this event.
By default, excludeMetadataFields
includes two fields, RECORD_ID
and TIMESTAMP
. These fields are part of every database object; their values are always set by the database layer, so the client typically does not send them.
Typically when using a table entity as the eventHandler <Type>
, you don't need to use excludeMetadataFields
. If you do, be sure to keep the 2 default fields RECORD_ID
and TIMESTAMP
listed in your set.
An exception is when you are using optimistic concurrency mode. In this case, it is essential to include the TIMESTAMP
field in the event meta. This happens automatically where excludeMetadataFields
has not been set.
overrideMetadataFields
overrideMetadataFields
is used to override the metadata field properties of the eventHandler type, this isn't to change the type of a field, more to override fields being optional and default values.
eventHandler<Trade>(name = "TRADE_INSERT", transactional = true) {
overrideMetadataFields = listOf(
OverrideMetaField(name = "QUANTITY", optional = true, default = 1)
)
}
By default this is an empty set.
Where setting mandatory fields to be optional, you must have a default supplied as part of the event Type meta.
permissioning
The permissioning
block is used to implement access control measures on the data being returned to a user.
With the exception of the inner auth
block which relies on inbound data, the permissioning
block and its contents can also be placed in the outer block and apply to all query
/requestServer
/eventHandler
blocks named in the file.
permissionCodes
permissionCodes
takes a list of permission codes. The client user accessing the query must have access to at least one of the permission codes in the list to be able to access any query data.
permissioning {
permissionCodes = listOf("TradeView", "TradeUpdate")
}
customPermissions
The customPermissions
block takes boolean logic. If this function returns true, the user will be able to access the resource; otherwise, the request will be rejected.
It is useful, for example, in the case you have an external API to check against. In this example an entitlementUtils
object wraps an external API we can call to check the user's name has access.
customPermissions { message ->
entitlementUtils.userIsEntitled(message.userName)
}
The customPermissions
block also has access to entityDb
so that you can query any data in the database. You can add complex logic in here as needed, and have access to the full inbound message.
customPermissions { message ->
val userAttributes = entityDb.get(UserAttributes.byUserName(message.userName))
userAttributes?.accessType == AccessType.ALL
}
Where utilizing customPermissions
you should also consider configuring a customLoginAck
so that the front end of the application can also permission its components in a similar way.
auth
auth
is used to restrict rows (queries) or events with a particular entity based on a user's entity access, also known as row-level permissions.
permissioning {
auth(mapName = "ENTITY_VISIBILITY") {
authKey {
key(data.counterpartyId)
}
}
Details on restricting query entity access here
Auth definitions can be grouped with “and” or “or” operators.
- You could have two simple permission maps, for example: one by counterparty and another one for forbidden symbols. If the user wants to see a specific row, they need to have both permissions at once.
- You could have two permission maps: one for buyer and one for seller. A user would be allowed to see a row if they have a seller or buyer profile, but users without one of the those profiles would be denied access.
This example shows an AND grouping:
permissioning {
auth(mapName = "ENTITY_VISIBILITY") {
authKey {
key(data.counterpartyId)
}
} and auth(mapName = "SYMBOL_RESTRICTED") {
authKey {
key(data.symbol)
}
}
}
This example shows OR grouping:
permissioning {
auth(mapName = "ENTITY_VISIBILITY") {
authKey {
key(data.buyerId)
}
} or auth(mapName = "ENTITY_VISIBILITY") {
authKey {
key(data.sellerId)
}
}
}
userHasRight
userHasRight
is a helpful function which can be called within the auth block, or anywhere else in client facing server components which have a user accessing them, to determine if a user has a given right.
if (!userHasRight(userName, "TradeViewFull")) { ... }
requiresPendingApproval
This is used where particular system events require a second system user to approve them before they take effect in the application.
In the example below, you can optionally set the approval message. This is the message that an authorized approver will see during their review.
The function must return boolean logic, where it evaluates to false the event will be subject to approval.
eventHandler<Trade>("TRADE_INSERT") {
requiresPendingApproval { event ->
event.approvalMessage = "Please approve my new trade"
event.userName != "system.user"
}
...
}
In this example, only a user with username "system.user" is able to trigger this event without it first being approved.
Note that the client may also send APPROVAL_MESSAGE
and, where specified, approvalMessage
will take precedence over it.
See approval workflow for more details.
onValidate
The onValidate
block is used for validation. The Event Handler workflow allows clients to set boolean the flag VALIDATE
. When set to true
, this runs the code in the onValidate
block and returns before executing the onCommit
block, which commits the action (e.g. write to database, or perhaps generate a file or some other permanent action). For this reason, the entityDb handle used in this block is for read operations only.
As well as returning an ack
(success) or a nack
, the onValidate
block allows users to send soft warnings back as part of the validation. The client can ignore these warnings, for example if the client is a user interface.
onValidate
is helpful for systems where you may want to add "fat-finger checks", a common term in financial applications which essentially means we're checking if the user may have made a mistake in their input. The check is to see if they have set a quantity or price that is unusually large or small, and could contain mistakes which lead to financial loss. Such warnings can be sent back to the client in a warningNack
.
onValidate
blocks should always return an ack
as the final statement.
onValidate
blocks are optional and will automatically return an ack
where not defined.
verify
Within the onvalidate
block, a verify
block is helpful for common relational checks. For example, to check that referenced ID-based entities already exist in the database.
In this example we verify that the instrument and counterparty IDs that were supplied on the event exist in the database.
onValidate {
...
verify {
entityDb hasEntry Instrument.ById(event.details.instrumentId)
entityDb hasEntry Counterparty.ById(event.details.counterpartyId)
}
...
}
If the boolean logic fails, validation of the event has failed, and a nack is returned.
require
require
is a kotlin construct often used in onValidate
blocks, because where the defined boolean logic statement fails, it automatically returns a nack
with the given error message.
In the example below, if a price is not "more than or equal to 0.0" the message "Price cannot be negative" is returned.
eventHandler<Trade>(name = "TRADE_INSERT") {
onValidate { event ->
val price = event.details.price
require(price >= 0.0) { "Price cannot be negative" }
...
}
...
}
warningNack
A warningNack
is only applicable to the onValidate
block, and is used to warn the client. These warnings are sometimes referred to as "soft errors".
warningNack
, like nack
, accepts a String parameter, which is the message given back to the client. For example, warningNack("Price differs more than 10% from the current market price.")
.
A warningNack
may also take an object of type Throwable
. In this case, the message
of the Throwable will be sent back to the client.
onValidate {
...
if(pricingLibrary.priceFarFromCurrentMarketPrice(trade.price, trade.direction)) {
return@onValidate warningNack("Price out of bounds from market price")
}
...
}
Clients using the API are expected to set IGNORE_WARNINGS
to true
on the inbound message where they wish to ignore warnings. For example, if a warning is presented to a user, they can be presented with an option to ignore, or to edit and re-submit for validation.
System clients can just set IGNORE_WARNINGS
to true
on all messages to ensure that:
- warnings don't stop system integration messages persisting
- complex logic to determine whether to ignore specific warnings is not required
approvableAck
An approvableAck
should be returned in an onValidate block where requiresApproval
has been set on the event.
There are useful properties you can set as part of the approvableAck
definition. They are all optional and are detailed below:
entityDetails
is a list ofApprovalEntityDetails
with their correspondingentityTable
,entityId
andapprovalType
properties (see previous paragraph). By default, this list is empty.approvalMessage
contains the text that is sent back to the client, assuming the event is successfully submitted for approval. The default is "Your request was successful and has been submitted for approval".additionalDetails
can provide context information that is only available from a server-side perspective. This information complements theAPPROVAL_MESSAGE
content provided by the front end.approvalType
is used to state the action that happens when this event is approved: NEW for insertions, UPDATE for amends, REMOVE for removals. If undefined, this defaults to UNKNOWN. Most events will be simple, but of course some could affect multiple entities in different ways, which is why theentityDetails
parameter can contain many entities, each with its ownapprovalType
.
eventHandler<Trade>("TRADE_MODIFY") {
requiresPendingApproval { event ->
event.approvalMessage = "Potential cost to firm modifying trades which have already been reported." //Available to the user who is approving
event.userName != "system.user"
}
onValidate { event ->
val trade = event.details
approvableAck(
entityDetails = listOf(
// One or many entities can be affected with a single event, so we can provide the whole list here
ApprovalEntityDetails(
entityTable = "TRADE",
entityId = event.details.trade.toString(),
approvalType = ApprovalType.UPDATE
)
),
approvalMessage = "Trade modification for ${event.details.tradeId} has been sent for approval.", //Sent to the user who submitted the event
approvalType = ApprovalType.UPDATE,
additionalDetails = "Potential cost to firm modifying trades which have already been reported." //Further message available to the user who is approving
)
}
...
}
See approval workflow for more details.
onCommit
The onCommit
block runs after successful validation in the onValidate
block. The code here typically affects the data in the system (for example, writing to the database, generating a file, triggering some other event). This block may also perform further validations not possible in the previous block (e.g. relying on database updates in an event which writes multiple records). To reach the onCommit
block, VALIDATE
must have been set to false
.
The entityDb
handle in this block can be used to read and write.
As with onValidate
the block must finish with an ack
. If the event is set as transactional
and the ack
is not reached, any database writes will be rolled back.
eventHandler<Trade>("TRADE_INSERT") {
onCommit { event ->
val trade = event.details
val result = entityDb.insert(trade)
ack(listOf(mapOf("TRADE_ID" to result.record.tradeId)))
}
}
contextEventHandler
contextEventHandler
is similar to an eventHandler
, but is used where you want to pass context from the onValidate
block to the onCommit
block. For example, on a modification event: if you first look up the original entity record and want to pass to onCommit
so that it doesn't need to be re-read from the database.
contextEventHandler<Trade, Trade>(name = "TRADE_MODIFY") {
onValidate { event ->
val tradeDetails = event.details
val originalTrade = entityDb.get(Trade.ById(event.details.tradeId))
require(originalEmailDist != null) {
"Unknown Trade ID received ${event.details.csdCode}"
}
...
validationAck(validationContext = originalTrade)
}
onCommit { event, trade ->
...
entityDb.modify(trade)
ack()
}
}
- The definition takes two
<Type>
(aka Generic) parameters instead of one. The first is the context object, which will be passed to theonCommit
block. The second is the same as a regulareventHandler
: the event metadata. - The
onValidate
block needs to return avalidationAck
instead of a regularack
. - The
onCommit
block has two lambda parameters, the standard event + a second, which is a reference to the context object returned in thevalidationAck
.
validationAck
The same concept as an ack
[] but must return an object matching the type of the first <Type>
parameter in the contextEventHandler
definition.
contextEventHandler<Trade, Trade>(name = "TRADE_MODIFY") {
onValidate { event ->
val originalTrade = entityDb.get(Trade.ById(event.details.tradeId))
...
validationAck(validationContext = originalTrade)
}
}
Logging
Genesis provides a class called LOG
that can be used to insert custom log messages. This class provides you with
5 methods to be used accordingly: info
, warn
, error
,trade
,debug
. To use these methods, you need to provide, as an input,
a string that will be inserted into the Logs.
Here is an example where you can insert an info message into the log file:
LOG.info("This is an info message")
The LOG
instance is a Logger from SLF4J.
In order to see these messages, you must set the logging level accordingly. You can do this using the logLevel command.
Optimistic concurrency
Optimistic Concurrency helps prevent users from updating or deleting a stale version of a record. Our page on Optimistic Concurrency provides details around how this works for event handlers.
Client API
To use the API, clients must first authenticate to obtain a SESSION_AUTH_TOKEN
by calling EVENT_LOGIN_AUTH
.
This API is also accessible via Open API.
EVENT_<EVENT_HANDLER_NAME>
To trigger an Event Handler, the client sends a MESSAGE_TYPE
of EVENT_<event name>
.
There are several options that can be specified in the message header:
Option | Default | Description |
---|---|---|
VALIDATE | false | Where true , events will return after the onValidate block ack and not trigger the onCommit block |
IGNORE_WARNINGS | false | Where true , any warningNack in the code will be ignored and not cause the event to fail validation |
REQUIRES_APPROVAL | false | A client can force the approval workflow for the event by setting this to true |
APPROVAL_MESSAGE | Where approval workflow is triggered, this is the approval message the approver will see. This may be overridden in the event 'requiresPendingApproval`, which takes precedence | |
REASON | Where the event is set as transactional and the tables written by the event are enabled for audit this String will be set in the audit record's AUDIT_EVENT_TEXT |
- Websocket API
- REST API
Request
{
"SOURCE_REF": "345",
"SESSION_AUTH_TOKEN": "snsFDc7AdjG8Khl95uUdUBm5UO1uqIfq",
"MESSAGE_TYPE": "EVENT_TRADE_INSERT",
"DETAILS": {
"COUNTERPARTY_ID": 1,
"DATE": 1731542400000,
"DIRECTION": "BUY",
"INSTRUMENT_ID": 2,
"QUANTITY": 1000,
"TRADE_PRICE": 1.23
}
}
Response
{
"MESSAGE_TYPE": "EVENT_ACK",
"SOURCE_REF": "345",
"GENERATED": [
{
"TRADE_ID": 5
}
],
"METADATA": {
"IS_EMPTY": true,
"ALL": {}
}
}
Request
POST /event-trade-insert HTTP/1.1
Host: localhost:9064
Content-Type: application/json
SESSION_AUTH_TOKEN: snsFDc7AdjG8Khl95uUdUBm5UO1uqIfq
SOURCE_REF: 345
{
"DETAILS": {
"COUNTERPARTY_ID": 1,
"DATE": 1731542400000,
"DIRECTION": "BUY",
"INSTRUMENT_ID": 2,
"QUANTITY": 1000,
"TRADE_PRICE": 1.23
}
}
Response
HTTP/1.1 200 OK
content-type: application/json
content-length: 556
connection: keep-alive
{
"MESSAGE_TYPE": "EVENT_ACK",
"SOURCE_REF": "345",
"GENERATED": [
{
"TRADE_ID": 5
}
],
"METADATA": {
"IS_EMPTY": true,
"ALL": {}
}
}
Approval workflow
The Genesis Application Platform has an in-built pending approval mechanism that can be used with Event Handlers. This is useful where particular events require a second user to approve them in order to take effect. Genesis Pending Approvals works with the concepts of “delayed” events and "four-eyes check".
Events are essentially queued until the time they are approved. Once approved, they are re-triggered to take affect.
Require approval on an event
To enable the pending approval workflow for an Event Handler implementation, either:
- Override the
requiresPendingApproval
method with an appropriate function in the custom Event Handler definitions.
or
- Configure the
requiresPendingApproval
block in a GPAL Event Handler.
Both of these options involve implementing a function with the event message as an input and a boolean value as a return value.
Here is an example of a GPAL Event Handler definition where event.userName
is used to gain access to the user who triggered the event; if the user name is not system.user, it is directed to an approval procedure:
eventHandler {
eventHandler<Company>("COMPANY_INSERT") {
// Override requiresPendingApproval here to enable the "pending approval" flow.
// In this implementation, any user that is not "system.user" needs to go through the approval mechanism.
// The last line just needs to evaluate to a boolean; if false it does not require approval, if true it does
requiresPendingApproval { event ->
event.userName != "system.user"
}
onCommit { event ->
val company = event.details
// custom code block..
ack()
}
}
}
or in a custom Event Handler definition:
package global.genesis.position.samples.events.async
import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.async.AsyncContextValidatingEventHandler
import global.genesis.gen.dao.Company
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply
import global.genesis.message.core.event.ValidationResult
@Module
class TestCompanyHandlerAsyncContext : AsyncContextValidatingEventHandler<Company, EventReply, String> {
override suspend fun onValidate(message: Event<Company>): ValidationResult<EventReply, String> {
val company = message.details
// custom code block..
val companyName = company.companyName
return validationResult(ack(), companyName)
}
override suspend fun onCommit(message: Event<Company>, context: String?): EventReply {
if(context != null){
// Do something with the context
}
val company = message.details
// custom code block..
return ack()
}
}
Pending approval workflow
Events going through a pending approval workflow are validated as usual (i.e. the onValidate
method is run). If the validation is successful, the “delayed” event is stored in the APPROVAL
table in JSON format.
Assuming the event is inserting, updating or deleting a target database record, it is possible to have multiple APPROVAL
records associated with a single database entity. So, you should use the event onValidate
method to check for pre-existing approvals against the entities related to the event if you need to ensure there is only one pending approval per record.
The APPROVAL
record is keyed on an auto-generated APPROVAL_ID
and does not have a direct link to the original record(s). You have to create one or many links by adding “approval entity” details to the payload returned within an approvableAck
inside the onValidate
method. These details include the entityTable
(e.g COUNTERPARTY), entityKey
(e.g. COUNTERPARTY_ID), as well as an optional approvalType
to describe what operation is happening on the entity itself (e.g. NEW, UPDATE or REMOVE).
This approach enables you to decide how to identify the original record (e.g. creating a compound key in the case of multi-field keys). When the approval entity details are provided, the platform creates one or more records in the APPROVAL_ENTITY
table; it populates it (them) with the details provided and the APPROVAL_ID
of the APPROVAL
record. There is also an APPROVAL_ENTITY_COUNTER
, which is populated by the GENESIS_AUTH_CONSOLIDATOR process by default; this can be handy when you need to know how many approvals are pending for a given entity.
There are other useful properties you can set as part of the approvableAck
definition. They are all optional and are detailed below:
entityDetails
is a list ofApprovalEntityDetails
with their correspondingentityTable
,entityId
andapprovalType
properties (see previous paragraph). By default, this list is empty.approvalMessage
contains the text that is sent back to the client, assuming the event is successfully submitted for approval. The default is "Your request was successful and has been submitted for approval".additionalDetails
can provide context information that is only available from a server-side perspective. This information complements theAPPROVAL_MESSAGE
content provided by the front end.approvalType
is used to state the action that happens when this event is approved: NEW for insertions, UPDATE for amends, REMOVE for removals. If undefined, this defaults to UNKNOWN. Most events will be simple, but of course some could affect multiple entities in different ways, which is why theentityDetails
parameter can contain many entities, each with its ownapprovalType
.
One further property, approvableAck
, can be used in both custom eventHandler
definitions and GPAL Event Handlers. Below is an example of approvableAck
in action for a GPAL eventHandler
onValidate
block.
eventHandler {
eventHandler<Company>("COMPANY_AMEND") {
// Override requiresPendingApproval here to enable the "pending approval" flow.
// In this implementation, any user that is not "system.user" needs to go through requires going through the approval mechanism.
// The last line just needs to evaluate to a boolean; if false it does not require approval, if true it does
requiresPendingApproval { event ->
event.userName != "system.user"
}
onValidate { event ->
val company = event.details
// custom validation code block..
return approvableAck(
entityDetails = listOf(
// One or many entities can be affected with a single event, so we can provide the whole list here
ApprovalEntityDetails(
entityTable = "COMPANY",
entityId = event.details.companyId.toString(),
approvalType = ApprovalType.UPDATE
)
),
approvalMessage = "Company update for ${event.details.companyId} has been sent for approval.",
approvalType = ApprovalType.UPDATE,
additionalDetails = "Sensitive update, tread carefully"
)
}
onCommit { event ->
val company = event.details
// custom code block..
ack()
}
}
}
The platform provides two Data Server queries that contain Pending approval information: ALL_APPROVAL_ALERTS
and ALL_APPROVAL_ALERTS_AUDITS
Example APPROVAL DB record
-------------------------------------------------------------------------------------------
TIMESTAMP 2023-02-27 15:33:42.364(n:0,s:1019) NANO_TIMESTAMP
ACTIONED_BY JaneDee STRING
APPROVAL_ID fdef7802-6bd1-4c51-a232-6a4bc2325598A... STRING
APPROVAL_KEY fac1be9f-1653-4ecf-9050-d13cc2d2cdb4A... STRING
APPROVAL_MESSAGE Cancelled STRING
APPROVAL_REQUESTED_AT 2023-02-27 15:33:38.450 +0000 DATETIME
APPROVAL_STATUS CANCELLED ENUM[PENDING APPROVED CANCELLED REJECTED_BY_USER REJECTED_BY_SERVICE]
APPROVAL_TYPE REMOVE ENUM[NEW UPDATE REMOVE UNKNOWN]
DESTINATION COUNTERPARTY_EVENT_HANDLER STRING
EVENT_DETAILS ISSUER_ID = 3 STRING
EVENT_MESSAGE {"DETAILS":{"COUNTERPARTY_ID":3},"MES... STRING
MESSAGE_TYPE EVENT_COUNTERPARTY_DELETE STRING
USER_NAME JaneDee STRING
-------------------------------------------------------------------------------------------
Example APPROVAL_ENTITY record
-------------------------------------------------------------------------------------------
TIMESTAMP 2023-02-27 15:33:38.459(n:0,s:1004) NANO_TIMESTAMP
APPROVAL_ID fdef7802-6bd1-4c51-a232-6a4bc2325598A... STRING
APPROVAL_TYPE REMOVE ENUM[NEW UPDATE REMOVE UNKNOWN]
ENTITY_ID 3 STRING
ENTITY_TABLE COUNTERPARTY STRING
-------------------------------------------------------------------------------------------
Pending approval events
Once in the APPROVAL
table, the pending event can be cancelled, rejected or accepted by sending the following event messages to GENESIS_CLUSTER:
- EVENT_PENDING_APPROVAL_ACCEPT
- EVENT_PENDING_APPROVAL_CANCEL
- EVENT_PENDING_APPROVAL_REJECT
All messages require a valid APPROVAL_ID
and APPROVAL_MESSAGE
in their metadata.
Allowed approvers
The platform ensures that users cannot approve or reject their own events, but they can cancel them. To complement this, users that have not created a specific pending approval event can only accept or reject, not cancel.
Additional levels of control (e.g. based on user groups) can be added at three points:
- to the front end
- to the event
onValidate
method - specified in the server-side configuration
To configure the allowed approvers using server-side configuration:
-
Create a new GPAL approval file; its name must end in -approval.kts (e.g. test-approval.kts).
-
Add the new file's name to the
GENESIS_CLUSTER
process definition's<script></script>
element found in your application'scfg/genesis-processes.xml
(values should be comma-separated). If this file is not in your application, read here how to find and override the file.
See the sample file below:
import global.genesis.session.RightSummaryCache
val rightSummaryCache = inject<RightSummaryCache>()
pendingApproval {
insert {
true
}
accept {
val userAttributes = entityDb.get(UserAttributes.byName(userName))
userAttributes?.accessType == AccessType.INTERNAL
}
cancel {
true
}
reject {
rightSummaryCache.userHasRight(userName, "REJECT_PENDING_APPROVAL")
}
}
You can replace the "true" return values with Kotlin code in each of the relevant blocks. Alternatively, you don't need to define them at all, as they return "true" by default.
The platform makes the following objects accessible to the insert
block:
insertMessage
- an instance of thePendingApprovalInsert
class, which is used to populate theAPPROVAL
table if successful. The content of this class consists of several properties:approvalMessage
- contains the original approval message text sent by the user who initiated the action.messageType
- represents the original EVENT name (e.g. EVENT_TRADE_INSERT).destination
- is the process name this event was originally targeting (e.g. POSITION_EVENT_HANDLER).eventMessage
- contains the JSON object representing the original message payload.approvalType
- equivalent to the property with the same name provided as part ofapprovableAck
(see pending approval workflow).additionalDetails
- equivalent to the property with the same name provided as part ofapprovableAck
(see pending approval workflow).generated
- equivalent to the property namedentityDetails
provided as part ofapprovableAck
(see pending approval workflow).
userName
- a string property containing the user name who triggered the event.messageType
- a shortcut property accessor for themessageType
value stored insideinsertMessage
.eventMessage
- a shortcut property accessor for theeventMessage
value stored insideinsertMessage
.
The following objects are accessible within the accept
, cancel
and reject
blocks:
userName
- a string property containing the user name who triggered the pending approval event (e.g. accept, reject or cancel).pendingApproval
- the pending approval record stored in the database. The type of this property is the "Approval" database entity (see table entities).approvalMessage
- an instance of theApprovalMessage
class, which represents the payload of the message sent to EVENT_PENDING_APPROVAL_ACCEPT, EVENT_PENDING_APPROVAL_CANCEL and EVENT_PENDING_APPROVAL_REJECT. It contains two properties:approvalMessage
- the message text sent by the user who initiated this pending approval actionapprovalId
- contains the APPROVAL_ID used to identify the APPROVAL record we are handling as part of this action
messageType
- a shortcut property accessor for themessageType
value stored insidependingApproval
.eventMessage
- a shortcut property accessor for theeventMessage
value stored insidependingApproval
.
The following properties are automatically available for the whole scope of the -approval.kts file:
val systemDefinition: SystemDefinitionService
val rxDb: RxDb
val entityDb: AsyncEntityDb
val evaluatorPool: EvaluatorPool
val networkConfiguration: NetworkConfiguration
val serviceDetailProvider: ServiceDetailProvider
val serviceDiscovery: ServiceDiscovery
val injector: Injector
As shown in the previous code example, you can perform database lookups to retrieve additional information and return true
only if the necessary rights or attributes are in place. For example, if your system has the concept of internal and external users, and you only want to allow internal users to accept pending events, then you could check your custom user "ACCESS_TYPE" field as follows:
pendingApproval {
accept {
val userAttributes = entityDb.get(UserAttributes.byName(userName))
userAttributes?.accessType == AccessType.INTERNAL
}
}
Advanced allowed approvers
You might have noticed that the original type-safe event message types are lost inside the -approval.kts file, as the content of eventMessage
inside APPROVAL
table (and also inside PendingApprovalInsert
) is a serialized JSON string. You can deserialize the original type-safe objects using the selectPredicate
method combined with multiple onEvent
predicates. These methods are available in all the pendingApproval
code blocks: insert
, accept
, cancel
and reject
.
selectPredicate
is a function that accepts an indeterminate number of functions returning a boolean value, as well as a mandatorydefault
function to handle messages that do not fall into any defined category. Thedefault
function provides a GenesisSet object with the contents of the original message payload.onEvent
works very similarly to any other GPAL Event Handler definition. It enables you to treat the incoming message in the same way as you would have done within the original Event Handler; however, each function must return a boolean expression.
Please see the example below for custom logic using a table called "RESTRICTED_SYMBOL" to prevent restricted symbols from being added to the system, as well as checking user right codes:
import global.genesis.session.RightSummaryCache
val rightSummaryCache = inject<RightSummaryCache>()
pendingApproval {
accept {
selectPredicate(
onEvent<TradeInsert>("TRADE_INSERT") { event ->
val tradeInsert = event.details
val stockInRestrictedList = entityDb.get(RestrictedSymbol.bySymbol(tradeInsert.symbol))
// Deny any operation on restricted symbols.
if (stockInRestrictedList != null) {
false
} else {
rightSummaryCache.userHasRight(userName, "TRADE_INSERT")
}
},
onEvent<TradeAmend>("TRADE_AMEND") { event ->
val tradeAmend = event.details
val stockInRestrictedList = entityDb.get(RestrictedSymbol.bySymbol(tradeAmend.symbol))
// Deny any operation on restricted symbols.
if (stockInRestrictedList != null) {
false
} else {
rightSummaryCache.userHasRight(userName, "TRADE_AMEND")
}
},
onEvent<TradeDelete>("TRADE_DELETE") { event ->
val tradeDelete = event.details
val stockInRestrictedList = entityDb.get(RestrictedSymbol.bySymbol(tradeDelete.symbol))
// Deny any operation on restricted symbols.
if (stockInRestrictedList != null) {
false
} else {
rightSummaryCache.userHasRight(userName, "TRADE_DELETE")
}
},
// If the message can't be deserialized we will use a default fallback to genesisSet.
default = { genesisSet ->
true
}
)
}
}
System rejects
An approval process deliberately delays events. So it is possible that by the time a pending approval action is approved, the underlying data has changed enough to cause the delayed event to fail when executed. When this happens, the pending approval record will be marked as REJECTED_BY_SYSTEM in its APPROVAL_STATUS
field.
The platform provides an event designed to help you to reject events directly from the back end without human intervention. "EVENT_PENDING_APPROVAL_SYSTEM_REJECT" is an event that can be used to satisfy any specific requirements that fall outside the functionality of the pending approval system. The event is only accessible by back-end services. It takes two parameters:
approvalMessage
is the text of the message to be sent when the message is rejected.approvalKey
is a unique identifier for each pending approval record. This identifier is never exposed to the front end, so only back-end services have access to it. That makes it impossible to trigger this event unless you have access to the database system. You need to obtain theapprovalKey
programmatically so that you can supply it as a parameter value.
Let us look at an example. Consider a solution that needs to submit trades to an external system every day at midnight for confirmation purposes. Once the trades have been submitted, their content cannot be changed anymore, unless a separate amendment process is started outside the Genesis application. The system has a pending approval system that enables users to amend the content of each "Trade" database record to provide extra security.
Once a "Trade" record has been submitted at midnight, you need to prevent any pending approval events from amending the content of the trade records in the system, as this could cause inconsistencies between the external system and the Genesis system. To prevent this, you could run a job that automatically rejects all pending approval records at midnight every day.
Here is an example GPAL script that could be run every day at midnight to reject all the pending records.
import global.genesis.clustersupport.service.ServiceDiscovery
import global.genesis.message.core.event.ApprovalSystemRejectMessage
import global.genesis.message.core.event.Event
import global.genesis.pal.shared.inject
import kotlin.system.exitProcess
val serviceDiscovery: ServiceDiscovery = injector.inject()
val client = serviceDiscovery.resolveClientByResource("EVENT_PENDING_APPROVAL_SYSTEM_REJECT")
if (client == null || !client.isConnected) {
println("Unable to find service exposing EVENT_PENDING_APPROVAL_SYSTEM_REJECT")
exitProcess(1)
} else {
suspendable {
entityDb.getBulk<Approval>()
.filter { it.approvalStatus == ApprovalStatus.PENDING }
.collect { approval ->
val reply: EventReply? = client.suspendRequest(
Event(
messageType = "EVENT_PENDING_APPROVAL_SYSTEM_REJECT",
userName = "SYSTEM",
details = ApprovalSystemRejectMessage(
approvalKey = approval.approvalKey,
approvalMessage = "Rejected by system"
)
)
)
when (reply) {
is EventReply.EventAck ->
println("Successfully rejected APPROVAL_ID: ${approval.approvalId}")
is EventReply.EventNack ->
println("Failed to rejected APPROVAL_ID: ${approval.approvalId}: $reply")
else ->
println("Unexpected response from pending approval system: $reply")
}
}
}
}
Metrics
Ensure you have enabled metrics in your environment to view them.
The Event Handler latency metrics show how long it takes for a specific eventHandler
in the Event Handler to process a message.
Metric | Explanation |
---|---|
processing_latency | The latency for processing events (kts event handler) |
latency | The latency for processing events (kotlin/java event handler) |
Runtime configuration
To include your *-eventhandler.kts
file definitions in a runtime process, you need to set the process definition:
- Ensure
genesis-pal-eventhandler
is included inmodule
- Ensure
global.genesis.eventhandler.pal
is included inpackage
- Ensure your eventhandler.kts file(s) are defined in
script
- Ensure
pal
is set inlanguage
If you wish to run a dedicated process for an Event Handler, here is an example full process definition:
<process name="POSITION_EVENT_HANDLER">
<groupId>POSITION</groupId>
<start>true</start>
<options>-Xmx256m -DRedirectStreamsToLog=true -DXSD_VALIDATE=false</options>
<module>genesis-pal-eventhandler</module>
<package>global.genesis.eventhandler.pal</package>
<script>position-eventhandler.kts</script>
<description>Handles events</description>
<classpath>position-messages*,position-eventhandler*</classpath>
<language>pal</language>
</process>
See runtime configuration - processes for further details.
Performance
If better database response performance is required, you may also consider added a cache to the process. See runtime configuration - process cache for more details.
Testing
GenesisJunit is available from version 8 of the Genesis Server Framework (GSF).
If you are testing against a previous version of the framework, go to the legacy section.
Integration testing
This section covers the basics of testing Event Handlers. We shall use a very simple example, and work through the communication between our test and the Event Handler.
This includes how to test dynamic authorization.
This testing relies on GenesisJunit, which is designed to make testing easy.
In this example, we shall test the following Event Handler:
data class Hello(
val name: String,
)
eventHandler {
eventHandler<Hello>("HELLO_WORLD") {
onCommit {
ack()
}
}
}
Creating the test class
First, use the code below to create the test class:
- Kotlin
- Java
@ExtendWith(GenesisJunit::class)
@ScriptFile("hello-world-eventhandler.kts")
class EventHandlerTest {
// our tests go here ...
}
@ExtendWith(GenesisJunit.class)
@ScriptFile("hello-world-eventhandler.kts")
public class EventHandlerTest {
// our tests go here ...
}
The code above does two things:
- It enables GenesisJunit.
- It specifies the Event Handler script that we want to test, using the
ScriptFile
annotation.
There is more information about GenesisJunit
and the various annotations in the section on Integration testing.
Injecting an Event Handler client
Use the code below to inject an Event Handler client:
- Kotlin
- Java
@ExtendWith(GenesisJunit::class)
@ScriptFile("hello-world-eventhandler.kts")
class EventHandlerTest {
@Inject
lateinit var client: EventClientSync
// our test will go here ...
}
@ExtendWith(GenesisJunit.class)
@ScriptFile("hello-world-eventhandler.kts")
public class EventHandlerTest {
@Inject
private EventClientSync client = null;
// our test will go here ...
}
A first test
This test makes sure that the Event Handler returns an ack.
- Kotlin
- Java
@Test
fun testHelloWorld() {
val reply = client.sendEvent(
details = Hello("PETER"),
messageType = "EVENT_HELLO_WORLD"
)
assert(reply is EventReply.EventAck)
}
@Test
void testHelloWorld() {
var reply = client.builder()
.withDetails(new Hello("PETER"))
.withMessageType("EVENT_HELLO_WORLD")
.send();
assertInstanceOf(EventReply.EventAck.class, reply);
}
As you can see here, to send an event to our event handler, we need to provide the details class. The message type is optional, but we have to set it here, as we set a custom message name on our Event Handler.
We can also provide the user name:
- Kotlin
- Java
@Test
fun testHelloWorldWithUser() {
val reply = client.sendEvent(
details = Hello("PETER"),
messageType = "EVENT_HELLO_WORLD",
userName = "PETER"
)
assert(reply is EventReply.EventAck)
}
@Test
void testHelloWorldWithUser() {
var reply = client.builder()
.withDetails(new Hello("PETER"))
.withMessageType("EVENT_HELLO_WORLD")
.withUserName("PETER")
.send();
assertInstanceOf(EventReply.EventAck.class, reply);
}
Different clients
There are three versions of the Event Handler client available during testing:
- EventClientSync - this is the synchronous client, all calls are blocking
- EventClientAsync - this is the coroutine client, where calls suspend
- EventClientRx - this is the RxJava client, which wraps responses in a
Single
In most instances, EventClientSync will suffice. Use the other clients if your tests use other asynchronous operations.
Dynamic authorization
To test dynamic authorization, add the @EnableInMemoryTestAuthCache
to your class or method.
This makes InMemoryTestAuthCache
available for injection into your test class.
Amend the Event Handler to enable authorization:
data class Hello(
val name: String,
)
eventHandler {
eventHandler<Hello>("HELLO_WORLD_AUTH") {
permissioning {
auth("NAMES") {
authKey {
key(data.name)
}
}
}
onCommit {
ack()
}
}
}
The first thing you need to do, is to enable the in-memory test auth cache using the @EnableInMemoryTestAuthCache
annotation.
You also need to inject InMemoryTestAuthCache
into your test class.
- Kotlin
- Java
@ExtendWith(GenesisJunit::class)
@ScriptFile("hello-world-eventhandler.kts")
class EventHandlerTest {
@Inject
lateinit var client: EventClientSync
@Inject
lateinit var authCache: InMemoryTestAuthCache
@Test
fun testIsAuthorised() {
authCache.authorise(
authMap = "NAMES",
entityCode = "PETER",
userName = "PETER"
)
val reply = client.sendEvent(
details = Hello(name = "PETER"),
userName = "PETER",
messageType = "EVENT_HELLO_WORLD_AUTH"
)
assert(reply is EventReply.EventAck) { reply }
}
@Test
fun testIsNotAuthorised() {
authCache.revoke(
authMap = "NAMES",
entityCode = "PETER",
userName = "PETER"
)
val reply = client.sendEvent(
details = Hello(name = "PETER"),
userName = "PETER",
messageType = "EVENT_HELLO_WORLD_AUTH"
)
assert(reply is EventReply.EventNack)
}
}
@ExtendWith(GenesisJunit.class)
@ScriptFile("hello-world-eventhandler.kts")
public class EventHandlerJavaTest {
@Inject
private EventClientSync client = null;
@Inject
private InMemoryTestAuthCache authCache = null;
@Test
void testIsAuthorised() {
authCache.builder()
.withAuthMap("NAMES")
.withEntityCode("PETER")
.withUserName("PETER")
.authorise();
var reply = client.builder()
.withDetails(new Hello("PETER"))
.withMessageType("EVENT_HELLO_WORLD_AUTH")
.withUserName("PETER")
.send();
assertInstanceOf(EventReply.EventAck.class, reply);
}
@Test
void testIsNotAuthorised() {
authCache.builder()
.withAuthMap("NAMES")
.withEntityCode("PETER")
.withUserName("PETER")
.revoke();
var reply = client.builder()
.withDetails(new Hello("PETER"))
.withMessageType("EVENT_HELLO_WORLD_AUTH")
.withUserName("PETER")
.send();
assertInstanceOf(EventReply.EventNack.class, reply);
}
}
The revoke calls are not needed in this case; they have been included here to show the syntax. These calls can be used to revoke previous authorizations in your tests.
Conclusion
At this point you have tested a very simple Event Handler. You haven't even had to use the database! However, you have covered the basics of communication between tests and Event Handlers:
- sending details to our event
- overwriting the default message type
- setting the user on the event
- handling dynamic authorization
For more details about testing with Genesis, take a look at our integration test documentation.
Integration testing (legacy)
This section covers testing your Event Handler if you are using any version of the Genesis Server Framework before GSF v8.
The Genesis Platform provides the AbstractGenesisTestSupport
abstract class that enables end-to-end testing of specific areas of your application. In this case, we want to ensure that we have a database, seeded with information, and that our Event Handler configuration is used to create our Event Handler. We also need to add the required packages, genesis home and separately set the "IS_SCRIPT" System Definition property to true (This is required as part of the Event Handler initialization).
class EventHandlerTest : AbstractGenesisTestSupport<GenesisSet>(
GenesisTestConfig {
addPackageName("global.genesis.eventhandler.pal")
genesisHome = "<genesis-home>"
parser = { it }
scriptFileName = "<app-name>-eventhandler.kts"
initialDataFile = "seed-data.csv"
}
) {
override fun systemDefinition(): Map<String, Any> = mapOf("IS_SCRIPT" to "true")
}
For more information about AbstractGenesisTestSupport
, see the Testing pages.
Once you have added your config above, you can start writing tests against our Event Handler.
Writing tests
Let's write some tests for this simple Event Handler, defined below
eventHandler<Trade>(name = "TRADE_INSERT") {
onCommit { event ->
val trade = event.details
val result = entityDb.insert(trade)
ack(listOf(mapOf("TRADE_ID" to result.record.tradeId)))
}
}
Simple test
Below is an example of a simple test.
@Test
fun `test insert trade`(): Unit = runBlocking {
val message = Event(
details = Trade {
tradeId = "1"
counterpartyId = "CP1"
instrumentId = "I2"
side = "BUY"
price = 1.123
quantity = 1000
},
messageType = "EVENT_TRADE_INSERT"
)
val result: EventReply? = messageClient.suspendRequest(message)
result.assertedCast<EventReply.EventAck>()
val trade = entityDb.get(Trade.ById("1"))
assertNotNull(trade)
}
First, create your Event
object, setting the event details and specifying the intended Event Handler for the message "EVENT_TRADE_INSERT".
We then send a message to our Event Handler using messageClient.suspendRequest(message)
. The result is first verified to be an EventAck
. Then check that the inserted trade can be retrieved from the database.
Remember to add the runBlocking
coroutine scope to the test, as the Genesis platform uses Kotlin coroutines.
Error response test
You may also want to test a negative case, where you expect to receive an error as a response.
You need to modify the previous example Event Handler and add an onValidate
block:
eventHandler<Trade>(name = "TRADE_INSERT") {
onCommit { event ->
val trade = event.details
val result = entityDb.insert(trade)
ack(listOf(mapOf("TRADE_ID" to result.record.tradeId)))
}
onValidate { event ->
val message = event.details
verify {
entityDb hasEntry Counterparty.ById(message.counterpartyId)
entityDb hasEntry Instrument.ById(message.instrumentId)
}
ack()
}
}
In the example below, we expect the response to be of type EventNack
, which has a property error
containing a list of errors.
@Test
fun `test invalid instrument`(): Unit = runBlocking {
val message = Event(
details = Trade {
tradeId = "1"
counterpartyId = "CP1"
instrumentId = "DOESNOTEXIST"
side = "BUY"
price = 1.213
quantity = 100
},
messageType = "EVENT_TRADE_INSERT"
)
val result: EventReply? = messageClient.suspendRequest(message)
val eventNack: EventReply.EventNack = result.assertedCast()
assertThat(eventNack.error).containsExactly(
StandardError(
"INTERNAL_ERROR",
"INSTRUMENT ById(instrumentId=DOESNOTEXIST) not found in database"
)
)
}
Testing with authorization
To test that the Event Handler authorization works correctly, you need to do some setting up.
First, make sure that your authorization set-up is designed to behave as follows:
- A user who enters a trade must have an entry in the "ENTITY_VISIBILITY" auth map; the entity code for this user must match the
counterpartyId
of the trade. - The user must have an entry in the "RIGHT_SUMMARY" table with "RIGHT_CODE" as "TRADER".
Second, you need to modify the previous example Event Handler so that only authorized users can insert trades.
eventHandler<Trade>(name = "TRADE_INSERT") {
permissioning {
permissionCodes = listOf("TRADER")
auth(mapName = "ENTITY_VISIBILITY") {
authKey {
key(data.counterpartyId)
}
}
}
onValidate { event ->
val message = event.details
verify {
entityDb hasEntry Counterparty.ById(message.counterpartyId)
entityDb hasEntry Instrument.ById(message.instrumentId)
}
ack()
}
onCommit { event ->
val trade = event.details
val result = entityDb.insert(trade)
ack(listOf(mapOf("TRADE_ID" to result.record.tradeId)))
}
}
Third, you need to specify the auth cache override in the GenesisTestConfig
:
class EventHandlerTest : AbstractGenesisTestSupport<GenesisSet>(
GenesisTestConfig {
addPackageName("global.genesis.eventhandler.pal")
genesisHome = "/GenesisHome/"
parser = { it }
scriptFileName = "your-application-eventhandler.kts"
initialDataFile = "seed-data.csv"
addAuthCacheOverride("ENTITY_VISIBILITY")
}
) {
...
}
Fourth, in your test set-up, authorize one user to be able to insert trades and another who is not.
@Before
fun setUp() {
authorise("ENTITY_VISIBILITY", "CP1", "TraderUser")
val trader = DbRecord.dbRecord("RIGHT_SUMMARY") {
"USER_NAME" with "TraderUser"
"RIGHT_CODE" with "TRADER"
}
val support = DbRecord.dbRecord("RIGHT_SUMMARY") {
"USER_NAME" with "SupportUser"
"RIGHT_CODE" with "SUPPORT"
}
rxDb.insert(trader).blockingGet()
rxDb.insert(support).blockingGet()
}
For more information on authorization, see the authorization docs.
Below is a test that verifies only Traders can enter trades:
@Test
fun `test trade inserted by trader`(): Unit = runBlocking {
val message = Event(
details = Trade {
tradeId = "1"
counterpartyId = "CP1"
instrumentId = "I2"
side = "BUY"
price = 5.0
quantity = 1
},
messageType = "EVENT_TRADE_INSERT",
userName = "TraderUser"
)
val result: EventReply? = messageClient.suspendRequest(message)
result.assertedCast<EventReply.EventAck>()
val trade = entityDb.get(Trade.ById("1"))
assertNotNull(trade)
}
Following that, here is a test to verify that a trade cannot be entered if the user is not a Trader:
@Test
fun `test trade cannot be inserted if not trader`(): Unit = runBlocking {
val message = Event(
details = Trade {
tradeId = "1"
counterpartyId = "CP1"
instrumentId = "I2"
side = "BUY"
price = 5.0
quantity = 1
},
messageType = "EVENT_TRADE_INSERT",
userName = "SupportUser"
)
val result: EventReply? = messageClient.suspendRequest(message)
val eventNack = result.assertedCast<EventReply.EventNack>()
assertThat(eventNack.error).containsExactly(
StandardError(
"NOT_AUTHORISED",
"User SupportUser lacks sufficient permissions"
)
)
}
Manual testing
An API client, such as Postman or Insomnia is useful way of testing components. As a client, it is effectively a front end seeking information from the server.
The API client enables you to create calls to the resources in your server - Data Servers, Request Servers and Event Handlers. Then you can just click to run a call and see what response you get.
Before you can make any calls on these resources, you will have to permission yourself by obtaining a SESSION_AUTH_TOKEN. The details of how to do this are on our separate Testing page.
Once you have the SESSION_AUTH_TOKEN, keep a copy that you can paste into each request as you make your test call.
In the example below, we are using Insomnia as the client API. We are going to test the EVENT_COUNTERPARTY_INSERT Event Handler by adding a new counterparty.
url and Body
In front of the url, set the call to POST.
The url consists of:
- the address or hostname of the server
- if necessary, some extra routing; in this case sm uses a proxy to access the server
- the name of the Event Handler
Set the body to JSON. In the body, you need to insert the details of the fields for the new counterparty, as seen below:
Header
In the header, you need to supply:
- a SOURCE_REF (always), which identifies you; you can use any string value that suits you
- the SESSION_AUTH_TOKEN that permissions you to access the server
When you have all these elements in place, click on Send to make the call. If the event is a success, you will receive an ACK message.
Checking the insertion
Now you can check that the new counterparty you inserted is in the correct table of the database. The resource you need to check is the Request Server called ALL_COUNTERPARTYS.
In front of the url, set the call to POST.
The url consists of:
- the address or hostname of the server
- if necessary, some extra routing; in this case sm uses a proxy to access the server
- the name of the Request Server
Set the body to JSON. There is no need for any information in the body. Simply insert a pair of curly brackets .
In the header, you need to supply:
- a SOURCE_REF (always), which identifies you; you can use any string value that suits you
- the SESSION_AUTH_TOKEN that permissions you to access the server
When you have this in place, click on Send to make the call. You can see that the fields for the instruments have been returned on the right of the screen.
Testing java event handlers
The Genesis Application Platform provides the AbstractGenesisTestSupport
abstract class that enables end-to-end testing of specific areas of your application.
In this case, build GenesisTestConfig
with the following information:
- Set packages:
global.genesis.eventhandler
this is the standard package name from the framework, which is needed for all Java events/custom events. Make sure you name the package where you defined the events. In the example below, it isglobal.genesis.position.samples.events.rxjava
- Set genesis home
- Set initial data: we want to ensure that we have a database, seeded with information
public class TradingEventHandlerTest extends AbstractGenesisTestSupport<EventResponse> {
public TradingEventHandlerTest() {
super(GenesisTestConfig.builder()
.setPackageNames(List.of("global.genesis.eventhandler","global.genesis.position.samples.events.rxjava"))
.setGenesisHome("/GenesisHome/")
.setInitialDataFiles("seed-data.csv")
.setParser(EventResponse.Companion)
.build()
);
}
}
For more information about AbstractGenesisTestSupport
, see the Testing pages.
Once you have set up your configuration, you can start writing tests against your Event Handler.
Writing tests
Let's write some tests for the simple Event Handler defined below:
@Module
public class EventTrade implements Rx3ValidatingEventHandler<Trade, EventReply> {
private final RxEntityDb entityDb;
@Inject
public EventTrade(RxEntityDb entityDb) {
this.entityDb = entityDb;
}
@Nullable
@Override
public String messageType() {
return "TRADE_INSERT";
}
@NotNull
@Override
public Single<EventReply> onCommit(@NotNull Event<Trade> tradeEvent) {
Trade trade = tradeEvent.getDetails();
return entityDb.writeTransaction(txn -> {
Trade result = txn.insert(trade).blockingGet().getRecord();
return ack(this, List.of(Map.of("TRADE_ID", result.getTradeId())));
}).map(result -> result.getFirst());
}
@NotNull
@Override
public Single<EventReply> onValidate(@NotNull Event<Trade> event) {
Trade trade = event.getDetails();
if (entityDb.get(Counterparty.byId(trade.getCounterpartyId())).blockingGet() == null) {
return Single.just(new StandardError("INTERNAL_ERROR", "COUNTERPARTY ById(counterpartyId=" + trade.getCounterpartyId() +") not found in database").toEventNackError());
} else if (entityDb.get(Instrument.byId(trade.getInstrumentId())).blockingGet() == null) {
return Single.just(new StandardError("INTERNAL_ERROR", "INSTRUMENT ById(instrumentId=" + trade.getInstrumentId() +") not found in database").toEventNackError());
}
return ack(this);
}
}
Simple test
Below is an example of a simple test.
First, this creates an Event
object, setting the event details and specifying the intended Event Handler for the message "EVENT_TRADE_INSERT" and username.
Second, it sends a message to the Event Handler using getMessageClient().request(event, EventReply.class)
. The result is first verified to be an EventAck
.
Finally, it checks that the inserted trade can be retrieved from the database.:
@Test
public void testTradeInsert() throws InterruptedException {
Trade trade = Trade.builder()
.setTradeId("1")
.setCounterpartyId("CP1")
.setInstrumentId("I2")
.setSide("BUY")
.setPrice(1.123)
.setQuantity(1000)
.build();
Event event = new Event(trade, "EVENT_TRADE_INSERT", "JohnDoe");
EventReply reply = getMessageClient().request(event, EventReply.class).blockingGet();
assertEquals(reply, new EventReply.EventAck(List.of(Map.of("TRADE_ID", trade.getTradeId()))));
Trade result = getRxDb().entityDb().get(Trade.byId("1")).blockingGet();
assertNotNull(result);
}
Error response test
You may also want to test a negative case, where you expect to receive an error as a response.
In the example below, we expect the response to be of type EventNack
when we try to insert a wrong instrument ID. As in the Event Handler above, there is a check to see if the instrument exists in the database.
@Test
public void testTradeInsertWrongInstrumentId() throws InterruptedException {
Trade trade = Trade.builder()
.setTradeId("1")
.setCounterpartyId("CP1")
.setInstrumentId("DOESNOTEXIST")
.setSide("BUY")
.setPrice(1.213)
.setQuantity(100)
.build();
Event event = new Event(trade, "EVENT_TRADE_INSERT_JAVA", "JohnDoe");
EventReply reply = getMessageClient().request(event, EventReply.class).blockingGet();
GenesisError genesisError = new StandardError("INTERNAL_ERROR", "INSTRUMENT ById(instrumentId=DOESNOTEXIST) not found in database");
assertEquals(reply, new EventReply.EventNack(List.of(), List.of(genesisError)));
}
Testing with authorization
Set-up
To test that the Event Handler authorization works correctly, you need to do some setting up.
First, make sure that your authorization set-up is designed to behave as follows:
- A user who enters a trade must have an entry in the "ENTITY_VISIBILITY" auth map; the entity code for this user must match the
counterpartyId
of the trade. - The user must have an entry in the "RIGHT_SUMMARY" table with "RIGHT_CODE" as "TRADER".
Second, you need to modify the previous example Event Handler so that only authorized users can insert trades.
@Module
public class EventTrade implements Rx3EventHandler<Trade, EventReply> {
private final RxEntityDb entityDb;
private final RxDb rxDb;
private final RightSummaryCache rightSummaryCache;
private Authority authCache;
@Inject
public EventTrade(RxEntityDb entityDb, RxDb rxDb, RightSummaryCache rightSummaryCache) {
this.entityDb = entityDb;
this.rightSummaryCache = rightSummaryCache;
this.rxDb = rxDb;
}
@Inject
public void init() {
this.authCache = AuthCache.newReader("ENTITY_VISIBILITY", rxDb.getUpdateQueue());
}
@Nullable
@Override
public String messageType() {
return "TRADE_INSERT_JAVA";
}
@Override
public Single<EventReply> process(Event<Trade> tradeEvent) {
String userName = tradeEvent.getUserName();
if(rightSummaryCache.userHasRight(userName, "TRADER")){
Trade trade = tradeEvent.getDetails();
return entityDb.writeTransaction(txn -> {
Trade result = txn.insert(trade).blockingGet().getRecord();
return ack(this, List.of(Map.of("TRADE_ID", result.getTradeId())));
}).map(result -> result.getFirst());
}
return Single.just(new StandardError("NOT_AUTHORIZED", "User " + userName + " lacks sufficient permissions").toEventNackError());
}
}
Third, you need to specify the auth cache override in the GenesisTestConfig
:
public class TradingEventHandlerTest extends AbstractGenesisTestSupport<EventResponse> {
public TradingEventHandlerTest() {
super(GenesisTestConfig.builder()
.setPackageNames(List.of("global.genesis.eventhandler","global.genesis.rxjava"))
.setGenesisHome("/GenesisHome/")
.setInitialDataFiles("TEST_DATA.csv")
.setAuthCacheOverride(List.of("ENTITY_VISIBILITY"))
.setParser(EventResponse.Companion)
.build()
);
}
}
Fourth, in your test set-up, let's authorise one user to be able to insert trades and another who is not.
@Before
public void setUp() {
authorise("ENTITY_VISIBILITY", "CP1", "TraderUser");
getRxDb().insert(RightSummary.builder().setRightCode("TRADER").setUserName("TraderUser").build().toDbRecord()).blockingGet();
getRxDb().insert(RightSummary.builder().setRightCode("SUPPORT").setUserName("SupportUser").build().toDbRecord()).blockingGet();
}
For more information on authorisation, see the authorization docs.
Tests
After you have set things up. Now you can create the tests themselves.
Below is a test that verifies that only Traders can enter trades:
@Test
public void testTradeInsertedByTrader() {
Trade trade = Trade.builder()
.setTradeId("1")
.setCounterpartyId("CP1")
.setInstrumentId("I2")
.setSide("BUY")
.setPrice(5.0)
.setQuantity(1)
.build();
Event event = new Event(trade, "EVENT_TRADE_INSERT", "TraderUser");
EventReply reply = getMessageClient().request(event, EventReply.class).blockingGet();
assertEquals(reply, new EventReply.EventAck(List.of(Map.of("TRADE_ID", trade.getTradeId()))));
Trade insertedUser = getRxDb().entityDb().get(Trade.byId("1")).blockingGet();
assertNotNull(insertedUser);
}
Following that, we have a test to verify that a trade cannot be entered if the user is not a Trader:
@Test
public void testTradeCannotBeInsertedIfNotTrader() {
Trade trade = Trade.builder()
.setTradeId("1")
.setCounterpartyId("CP1")
.setInstrumentId("I2")
.setSide("BUY")
.setPrice(5.0)
.setQuantity(1)
.build();
Event event = new Event(trade, "EVENT_TRADE_INSERT_JAVA", "SupportUser");
EventReply reply = getMessageClient().request(event, EventReply.class).blockingGet();
GenesisError genesisError = new StandardError("NOT_AUTHORIZED", "User SupportUser lacks sufficient permissions");
assertEquals(reply, new EventReply.EventNack(List.of(), List.of(genesisError)));
}
Event handler API
GPAL event handlers are powerful and the recommended approach. There is an API available to write event handlers in Java, or Kotlin. The following section gives details of this API
In most cases, you will create Event handlers in GPAL as recommended. This offers a method with succinct code and a good degree of flexibility.
However, you can also implement Event Handlers as a set of classes. Typically, this is useful where you have a complex requirement for business logic and database interaction. For example, a kts file of 1,000 lines is difficult to test and maintain; in this case, a set of individual classes is much more convenient.
For implementing an Event Handler as a set of classes, there are three different options:
- Async. This uses the Kotlin coroutine API to simplify asynchronous development. This is the underlying implementation used in GPAL Event Handlers. You can only create Async Event Handlers using Kotlin.
- RxJava3. This uses the RxJava3 library, which is a popular option for composing asynchronous event-based programs. You can create RxJava3 Event Handlers using either Kotlin or Java.
- Sync. This creates synchronous Event Handlers. You can create Sync Event Handlers using either Kotlin or Java.
Configure in processes.xml file
You need to add the global.genesis.eventhandler
package to the package tag of the process; this tag defines which package the process should refer to. For example:
<process name="POSITION_NEW_PROCESS">
<groupId>POSITION</groupId>
<start>true</start>
<options>-Xmx256m -DRedirectStreamsToLog=true -DXSD_VALIDATE=false</options>
<module>position-new-process</module>
<package>global.genesis.eventhandler,position.company.manager</package>
<description>Handles events</description>
</process>
Event Handler interface
The Event Handler interface is the common supertype of AsyncEventHandler, Rx3EventHandler and SyncEventHandler, but it is not meant to be used on its own. It provides basic options for each Event Handler definition, which can be overridden. See the Kotlin methods explanation below:
Name | Signature | Default value | Description |
---|---|---|---|
excludeMetadataFields | fun excludeMetadataFields(): Set<String> | setOf("RECORD_ID", "TIMESTAMP") | Contains a list of metadata fields to be excluded from the event metadata extracted from the input I |
includeMetadataFields | fun includeMetadataFields(): Set<String> | emptySet() | Contains a list of metadata fields that need to be included in the event metadata; this must be available in input I . A non-empty list will exclude the other fields. |
messageType | fun messageType(): String? | null | Contains the name of the Event Handler. If undefined, the Event Handler name will become EVENT_*INPUT_CLASS_NAME* . So, for an Event Handler using an input type called TradeInsert , the message type will become EVENT_TRADE_INSERT . |
overrideMetadataFields | fun overrideMetadataFields(): Map<String, OverrideMetaField> | emptySet() | Contains a map (key-value entries) of metadata field names to metadata field definitions in the shape of OverrideMetaField . This enables you to override the metadata field properties extracted from input I |
requiresPendingApproval | fun requiresPendingApproval(): Boolean | false | This is used where particular system events require a second system user to approve them (pending approval in order to take effect) |
schemaValidation | fun schemaValidation(): Boolean | true | This option enables you to disable the automatic Json Schema validation enforced by the back end. See type-safe messages for more information. |
Each custom Event Handler must define an input message type I
and an output message type O
(these need to be data classes), as GPAL Event Handlers do). In the examples below, Company
is the input message and EventReply
is the output message. The message
object contains event message and has the following properties :
Name | Default value | Description |
---|---|---|
details | This has input information, example: Company | |
messageType | Name of the Event Handler | |
userName | Name of logged-in user | |
ignoreWarnings | false | If set to false, events will not be processed if there are any warnings; you will get EventNack with warning message. If set to true, warning messages will be ignored; processing of events will be stopped only if there are any errors |
requiresApproval | false | This particular event needs approval from a second user if set to true. For more details, see approval workflow |
approvalKey | null | Auto-generated key ID for particular approval request. For more details, see approval workflow |
approvalMessage | null | Optional message for approval request. For more details, see approval workflow |
reason | null | Optional reason sent as part of event message |
Async
AsyncEventHandler
This is the most basic definition of an Async Event Handler. You can define an AsyncEventHandler
by implementing the AsyncEventHandler
interface, which is defined as: interface AsyncEventHandler<I : Any, O : Outbound> : AsyncEventWorkflowProcessor<I, O>, EventHandler
The only mandatory method to implement this in the interface is:
Name | Signature |
---|---|
process | fun suspend process(message: Event<I>) : O |
This method passes the input message type I
as a parameter and expects the output message type O
to be returned.
Here is an example:
import com.google.inject.Inject
import global.genesis.commons.annotation.Module
import global.genesis.db.rx.entity.multi.AsyncEntityDb
import global.genesis.eventhandler.typed.async.AsyncEventHandler
import global.genesis.gen.dao.Company
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply
@Module
class EventCompanyHandlerAsync @Inject constructor(
private val entityDb: AsyncEntityDb,
private val companyService: CompanyService
) : AsyncEventHandler<Company, EventReply> {
override suspend fun process(message: Event<Company>): EventReply {
val company = message.details
// custom code block..
return EventReply.EventAck()
}
}
The methods below are provided as part of AsyncEventHandler
; they provide an easy way of creating EventReply
responses.
Name | Signature |
---|---|
ack | fun <I : Any> AsyncEventHandler<I, EventReply>.ack(): EventReply |
ack | fun <I : Any> AsyncEventHandler<I, EventReply>.ack(generated: List<Map<String, Any>> = emptyList()): EventReply |
nack | fun <I : Any> AsyncEventHandler<I, EventReply>.nack(throwable: Throwable): EventReply |
nack | fun <I : Any> AsyncEventHandler<I, EventReply>.nack(error: String): EventReply |
Using these helper methods, you could simplify the previous implementation like this:
import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.async.AsyncEventHandler
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply
@Module
class EventCompanyHandlerAsync : AsyncEventHandler<Company, EventReply> {
override suspend fun process(message: Event<Company>): EventReply {
val company = message.details
// custom code block..
return ack()
}
}
AsyncValidatingEventHandler
In the previous example, there was no distinction between validation and commit blocks, which is possible in GPAL Event Handlers. In order to have a better separation of concerns using custom Event Handlers, you can implement the AsyncValidatingEventHandler
interface, which is defined as:
interface AsyncValidatingEventHandler<I : Any, O : Outbound> : AsyncEventHandler<I, O>
Implementation
Using this interface, you do not need to override the process
method; you can split your logic into validation and commit stages. There are various methods of implementing this, which are described below:
Name | Signature |
---|---|
onValidate | suspend fun onValidate(message: Event<I>): O |
onCommit | suspend fun onCommit(message: Event<I>): O |
Here is an example:
import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.async.AsyncValidatingEventHandler
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply
@Module
class TestCompanyHandlerAsync : AsyncValidatingEventHandler<Company, EventReply> {
override suspend fun onValidate(message: Event<Company>): EventReply {
val company = message.details
// custom code block..
return ack()
}
override suspend fun onCommit(message: Event<Company>): EventReply {
val company = message.details
// custom code block..
return ack()
}
}
If the validate
flag is received as true
, only the onValidate
code block will be executed. If the validate
flag is received as false
, both the onValidate
and onCommit
blocks will be executed.
AsyncContextValidatingEventHandler
In some cases, you might want to carry information from the onValidate
code block to the onCommit
code block for efficiency purposes. (For example, if several database look-ups happen in onValidate
and you want to reuse that information.) Using the AsyncContextValidatingEventHandler
interface, you can provide this context information from the validation stage to the commit stage. See the interface below: interface AsyncContextValidatingEventHandler<I : Any, O : Outbound, C : Any> : AsyncEventHandler<I, O>
Implementation
As with the previous example, when using this interface, you do not need to override the process
method. The different methods for implementing this are described below:
Name | Signature |
---|---|
onValidate | suspend fun onValidate(message: Event<I>): ValidationResult<O, C> |
onCommit | suspend fun onCommit(message: Event<I>, context: C?): O |
The validationResult
methods are provided to help with the context creation:
Name | Signature |
---|---|
validationResult | fun validationResult(result: O): ValidationResult<O, C> |
validationResult | fun validationResult(result: O, context: C): ValidationResult<O, C> |
The type C
represents the contextual information we want to provide, and it can be any Java/Kotlin type. Here is an example:
import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.async.AsyncContextValidatingEventHandler
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply
import global.genesis.message.core.event.ValidationResult
@Module
class TestCompanyHandlerAsync : AsyncContextValidatingEventHandler<Company, EventReply, String> {
override suspend fun onValidate(message: Event<Company>): ValidationResult<EventReply, String> {
val company = message.details
// custom code block..
val companyName = company.companyName
return validationResult(ack(), companyName)
}
override suspend fun onCommit(message: Event<Company>, context: String?): EventReply {
if(context != null) {
// Do something with the context
}
val company = message.details
// custom code block..
return ack()
}
}
Rx3
The mechanism explained in Async can be recycled and reapplied in Rx3 Event Handlers.
Rx3EventHandler
In a similar fashion to AsyncEventHandler
, there is an Rx3 implementation flavour. It works in a very similar way to AsyncEventHandler
, but requires different return types (i.e. we expect to return RxJava3 Single<O>
type, instead of just the O
type).
See the interface definition below: interface Rx3EventHandler<I : Any, O : Outbound> : Rx3EventWorkflowProcessor<I, O>, EventHandler
Implementation
The mandatory method for implementing this is:
Name | Signature |
---|---|
process | fun process(message: Event<I>) : Single<O> |
Helper methods
Name | Signature |
---|---|
ack | fun <I : Any> Rx3EventHandler<I, EventReply>.ack(): Single<EventReply> |
ack | fun <I : Any> Rx3EventHandler<I, EventReply>.ack(generated: List<Map<String, Any>> = emptyList()): Single<EventReply> |
nack | fun <I : Any> Rx3EventHandler<I, EventReply>.nack(throwable: Throwable): Single<EventReply> |
nack | fun <I : Any> Rx3EventHandler<I, EventReply>.nack(error: String): Single<EventReply> |
Here is an example:
- Kotlin
- Java
import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.rx3.Rx3EventHandler
import global.genesis.gen.dao.Company
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply
import io.reactivex.rxjava3.core.Single
@Module
class TestCompanyHandlerRx3 : Rx3EventHandler<Company, EventReply> {
override fun process(message: Event<Company>): Single<EventReply> {
return ack()
}
}
import global.genesis.commons.annotation.Module;
import global.genesis.eventhandler.typed.rx3.Rx3EventHandler;
import global.genesis.gen.dao.Company;
import global.genesis.message.core.event.Event;
import global.genesis.message.core.event.EventReply;
import io.reactivex.rxjava3.core.Single;
@Module
public class TestCompanyHandlerRx3 implements Rx3EventHandler<Company, EventReply> {
@Override
public Single<EventReply> process(Event<Company> companyEvent) {
// custom block
return Single.just(new EventReply.EventAck());
}
}
Rx3ValidatingEventHandler
The same applies to an Rx3ValidatingEventHandler. It is similar to AsyncValidatingEventHandler in every way, but the return type is still Single<O>
.
interface Rx3ValidatingEventHandler<I : Any, O : Outbound> : Rx3EventHandler<I, O>
Implementation
Name | Signature |
---|---|
onValidate | fun onValidate(message: Event<I>): Single<O> |
onCommit | fun onCommit(message: Event<I>): Single<O> |
Here is an example:
- Kotlin
- Java
import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.rx3.Rx3ValidatingEventHandler
import global.genesis.gen.dao.Company
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply
import io.reactivex.rxjava3.core.Single
@Module
class TestCompanyHandlerRx3 : Rx3ValidatingEventHandler<Company, EventReply> {
override fun onValidate(message: Event<Company>): Single<EventReply> {
val company = message.details
// custom code block..
return ack()
}
override fun onCommit(message: Event<Company>): Single<EventReply> {
val company = message.details
// custom code block..
return ack()
}
}
import global.genesis.commons.annotation.Module;
import global.genesis.eventhandler.typed.rx3.Rx3ValidatingEventHandler;
import global.genesis.gen.dao.Company;
import global.genesis.message.core.event.Event;
import global.genesis.message.core.event.EventReply;
import io.reactivex.rxjava3.core.Single;
import org.jetbrains.annotations.NotNull;
@Module
public class TestCompanyHandlerRx3 implements Rx3ValidatingEventHandler<Company, EventReply> {
@NotNull
@Override
public Single<EventReply> onValidate(@NotNull Event<Company> message) {
Company company = message.getDetails();
// custom code block..
return Single.just(new EventReply.EventAck());
}
@NotNull
@Override
public Single<EventReply> onCommit(@NotNull Event<Company> message) {
Company company = message.getDetails();
// custom code block..
return Single.just(new EventReply.EventAck());
}
}
Rx3ContextValidatingEventHandler
And the same goes for Rx3ContextValidatingEventHandler
in relation to AsyncContextValidatingEventHandler.
interface Rx3ContextValidatingEventHandler<I : Any, O : Outbound, C : Any> : Rx3EventHandler<I, O>
Implementation
Name | Signature |
---|---|
onValidate | fun onValidate(message: Event<I>): Single<ValidationResult<O, C>> |
onCommit | fun onCommit(message: Event<I>, context: C?): Single<O> |
Helper methods
Name | Signature |
---|---|
validationResult | fun validationResult(result: O): ValidationResult<O, C> |
validationResult | fun validationResult(result: O, context: C): ValidationResult<O, C> |
Here is an example:
- Kotlin
- Java
import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.rx3.Rx3ContextValidatingEventHandler
import global.genesis.gen.dao.Company
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply
import global.genesis.message.core.event.ValidationResult
import io.reactivex.rxjava3.core.Single
@Module
class TestCompanyHandlerRx3 : Rx3ContextValidatingEventHandler<Company, EventReply, String> {
override fun onValidate(message: Event<Company>): Single<ValidationResult<EventReply, String>> {
val company = message.details
// custom code block..
val companyName = company.companyName
return Single.just(validationResult(EventReply.EventAck(), companyName))
}
override fun onCommit(message: Event<Company>, context: String?): Single<EventReply> {
if (context != null) {
// Do something with the context
}
val company = message.details
// custom code block..
return ack()
}
}
import global.genesis.commons.annotation.Module;
import global.genesis.eventhandler.typed.rx3.Rx3ContextValidatingEventHandler;
import global.genesis.gen.dao.Company;
import global.genesis.message.core.event.Event;
import global.genesis.message.core.event.EventReply;
import global.genesis.message.core.event.ValidationResult;
import io.reactivex.rxjava3.core.Single;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@Module
public class TestCompanyHandlerRx3 implements Rx3ContextValidatingEventHandler<Company, EventReply, String> {
@NotNull
@Override
public Single<EventReply> onCommit(@NotNull Event<Company> event, @Nullable String context) {
if (!context.isEmpty()) {
// Do something with the context
}
Company company = event.getDetails();
// custom code block..
return Single.just(new EventReply.EventAck());
}
@NotNull
@Override
public Single<ValidationResult<EventReply, String>> onValidate(@NotNull Event<Company> event) {
Company company = event.getDetails();
// custom code block..
String companyName = company.getCompanyName();
return Single.just(validationResult(new EventReply.EventAck(), companyName));
}
}
Sync
Sync works similarly to Async and Rx3, but in this case, there is no Single<O>
returned and no suspend
modifier used for Kotlin coroutines. The expected output of the Event Handler logic is just the O
type.
SyncEventHandler
interface SyncEventHandler<I : Any, O : Outbound> : SyncEventWorkflowProcessor<I, O>, EventHandler
Implementation
Name | Signature |
---|---|
process | fun process(message: Event<I>) : O |
Helper methods
Name | Signature |
---|---|
ack | fun <I : Any> SyncEventHandler<I, EventReply>.ack(): EventReply |
ack | fun <I : Any> SyncEventHandler<I, EventReply>.ack(generated: List<Map<String, Any>> = emptyList()): EventReply |
nack | fun <I : Any> SyncEventHandler<I, EventReply>.nack(throwable: Throwable): EventReply |
nack | fun <I : Any> SyncEventHandler<I, EventReply>.nack(error: String): EventReply |
Here is an example:
- Kotlin
- Java
import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.sync.SyncEventHandler
import global.genesis.gen.dao.Company
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply
@Module
class TestCompanyHandlerSync : SyncEventHandler<Company, EventReply> {
override fun process(message: Event<Company>): EventReply {
return ack()
}
}
import global.genesis.commons.annotation.Module;
import global.genesis.eventhandler.typed.sync.SyncEventHandler;
import global.genesis.gen.dao.Company;
import global.genesis.message.core.event.Event;
import global.genesis.message.core.event.EventReply;
@Module
public class TestCompanyHandlerSync implements SyncEventHandler<Company, EventReply> {
@Override
public EventReply process(Event<Company> companyEvent) {
// custom code block
return new EventReply.EventAck();
}
}
SyncValidatingEventHandler
interface SyncValidatingEventHandler<I : Any, O : Outbound> : SyncEventHandler<I, O>
Implementation
Name | Signature |
---|---|
onValidate | fun onValidate(message: Event<I>): O |
onCommit | fun onCommit(message: Event<I>): O |
Here is an example:
- Kotlin
- Java
import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.sync.SyncValidatingEventHandler
import global.genesis.gen.dao.Company
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply
@Module
class TestCompanyHandlerSync : SyncValidatingEventHandler<Company, EventReply> {
override fun onValidate(message: Event<Company>): EventReply {
val company = message.details
return ack()
}
override fun onCommit(message: Event<Company>): EventReply {
val company = message.details
return ack()
}
}
import global.genesis.commons.annotation.Module;
import global.genesis.eventhandler.typed.sync.SyncValidatingEventHandler;
import global.genesis.gen.dao.Company;
import global.genesis.message.core.event.Event;
import global.genesis.message.core.event.EventReply;
import org.jetbrains.annotations.NotNull;
@Module
public class TestCompanyHandlerSync implements SyncValidatingEventHandler<Company, EventReply> {
@NotNull
@Override
public EventReply onCommit(@NotNull Event<Company> event) {
Company company = event.getDetails();
// custom code block
return new EventReply.EventAck();
}
@NotNull
@Override
public EventReply onValidate(@NotNull Event<Company> event) {
Company company = event.getDetails();
// custom code block
return new EventReply.EventAck();
}
}
SyncContextValidatingEventHandler
interface SyncContextValidatingEventHandler<I : Any, O : Outbound, C : Any> : SyncEventHandler<I, O>
Implementation
Name | Signature |
---|---|
onValidate | fun onValidate(message: Event<I>): ValidationResult<O, C> |
onCommit | fun onCommit(message: Event<I>, context: C?): O |
Helper methods
Name | Signature |
---|---|
validationResult | fun validationResult(result: O): ValidationResult<O, C> |
validationResult | fun validationResult(result: O, context: C): ValidationResult<O, C> |
Here is an example:
- Kotlin
- Java
import global.genesis.commons.annotation.Module
import global.genesis.eventhandler.typed.sync.SyncContextValidatingEventHandler
import global.genesis.gen.dao.Company
import global.genesis.message.core.event.Event
import global.genesis.message.core.event.EventReply
import global.genesis.message.core.event.ValidationResult
@Module
class TestCompanyHandlerSync : SyncContextValidatingEventHandler<Company, EventReply, String> {
override fun onValidate(message: Event<Company>): ValidationResult<EventReply, String> {
val company = message.details
// custom code block..
val companyName = company.companyName
return validationResult(ack(), companyName)
}
override fun onCommit(message: Event<Company>, context: String?): EventReply {
if (context != null) {
// Do something with the context
}
val company = message.details
// custom code block..
return ack()
}
}
import global.genesis.commons.annotation.Module;
import global.genesis.eventhandler.typed.sync.SyncContextValidatingEventHandler;
import global.genesis.gen.dao.Company;
import global.genesis.message.core.event.Event;
import global.genesis.message.core.event.EventReply;
import global.genesis.message.core.event.ValidationResult;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@Module
public class TestCompanyHandlerSync implements SyncContextValidatingEventHandler<Company, EventReply, String> {
@NotNull
@Override
public ValidationResult<EventReply, String> onValidate(@NotNull Event<Company> event) {
Company company = event.getDetails();
// custom code block..
String companyName = company.getCompanyName();
return validationResult(new EventReply.EventAck(), companyName);
}
@NotNull
@Override
public EventReply onCommit(@NotNull Event<Company> event, @Nullable String context) {
if (!context.isEmpty()) {
// Do something with the context
}
Company company = event.getDetails();
// custom code block..
return new EventReply.EventAck();
}
}
Java event handlers
Event Handlers can be written in Java using Event Handler APIs. On this page, we look at Event Handlers written using the Rx3 Event handlers
To work with a Java Event Handler:
- In your application-eventhandler/src/main/ folder, create an empty folder called java. This ensures that the Java file will be compiled.
- In your application-eventhandler folder, create a package for your Java Event Handler. Make sure that your package name is specific - this will be used to start the Event Handler process.
- You can then create the file for the Java Event Handler in this folder.
- Update your
processes.xml
file to include the details of the new module. For example:
<process name="JAVA_EVENT_HANDLER">
<groupId>MYAPP</groupId>
<start>true</start>
<options>-Xmx256m -DRedirectStreamsToLog=true -DXSD_VALIDATE=false</options>
<module>myapp-eventhandler</module>
<package>global.genesis.eventhandler,genesis.global.myapp.trades</package>
<description>Handles events</description>
</process>
You can use both Java and Kotlin Event Handlers in the same process.
Make sure that both types of Event Handler are included in your application's processes.xml file. In the example below:
- The Kotlin Event Handler is
<module>genesis-pal-eventhandler</module>
. - The
<script>
and<language>
for the Kotlin file are registered. - The Kotlin and Java Event Handlers are both registered in the
<package>
. (In this case, the Java files are located in the package genesis.global.trades.) - The jar files for the Java Event Handler are registered in the
<classpath>
.
<process name="JAVA_KOTLIN_EVENT_HANDLERS">
<groupId>MYAPP</groupId>
<start>true</start>
<options>-Xmx256m -DRedirectStreamsToLog=true -DXSD_VALIDATE=false</options>
<module>genesis-pal-eventhandler</module>
<package>global.genesis.eventhandler.pal,genesis.global.trades</package>
<script>myapp-eventhandler.kts</script>
<language>pal</language>
<classpath>myapp-eventhandler*.jar</classpath>
<description>Handles events</description>
</process>
Example
This method passes the input message type CounterParty
as a parameter and expects the output message type EventReply
to be returned.
The default name will be EVENT_<input message type name>
. So, for an input message type declared as CounterParty
, the event with the name EVENT_COUNTERPARTY
is registered automatically.
@Module
public class EventCounterParty implements Rx3EventHandler<Counterparty, EventReply> {
private final RxEntityDb entityDb;
@Inject
public EventCounterParty(RxEntityDb entityDb) {
this.entityDb = entityDb;
}
@Override
public Single<EventReply> process(Event<Counterparty> counterpartyEvent) {
Counterparty counterparty = counterpartyEvent.getDetails();
return entityDb.insert(counterparty).flatMap(result -> ack(this));
}
}
Any Java Event Handler classes you create must be placed in the same folder as the Java Event Handler module itself.
Adding a name
Every eventHandler
must have a unique name. If you do not provide one, it will be allocated a default name automatically, as shown in the previous example.
It is good practice to provide your own name for each eventHandler
. For example, if you have insert and modify codeblocks for the same table and you don't name them, then the platform will probably generate identical names for both - which will give you a problem.
Note that the prefix EVENT_
is automatically added to the name that you specify.
So, below, we modify our previous example by defining the name of the eventHandler
: COUNTERPARTY_INSERT by overriding messageType method of the Rx3EventHandler. This will automatically become EVENT_COUNTERPARTY_INSERT.
@Module
public class EventCounterParty implements Rx3EventHandler<Counterparty, EventReply> {
private final RxEntityDb entityDb;
@Inject
public EventCounterParty(RxEntityDb entityDb) {
this.entityDb = entityDb;
}
@Nullable
@Override
public String messageType() {
return "COUNTERPARTY_INSERT";
}
@Override
public Single<EventReply> process(Event<Counterparty> counterpartyEvent) {
Counterparty counterparty = counterpartyEvent.getDetails();
return entityDb.insert(counterparty).flatMap(result -> ack(this));
}
}
Adding validation
So far, we have overridden the process method in our eventHandler
. This is where the active instructions are - usually database changes.
If you want to provide some validation before the action, you need to implement the Rx3ValidatingEventHandler
interface and override onCommit
and onValidate
methods.
in the example below, the onValidate
method will be executed first and the onCommit
method will only be executed if the counterparty
field is not null.
@Module
public class EventCounterParty implements Rx3ValidatingEventHandler<Counterparty, EventReply> {
private final RxEntityDb entityDb;
@Inject
public EventCounterParty(RxEntityDb entityDb) {
this.entityDb = entityDb;
}
@Nullable
@Override
public String messageType() {
return "COUNTERPARTY_INSERT";
}
@NotNull
@Override
public Single<EventReply> onCommit(@NotNull Event<Counterparty> event) {
Counterparty counterparty = event.getDetails();
return entityDb.insert(counterparty) .flatMap(result -> ack(this));
}
@NotNull
@Override
public Single<EventReply> onValidate(@NotNull Event<Counterparty> event) {
Counterparty counterparty = event.getDetails();
if(counterparty.getName().isEmpty()) {
return nack(this, "Counterparty should have a name");
}
return ack(this);
}
}
Returning a nack
The onValidate
method must always return either an ack()
or nack(...)
.
Look at the onValidate
method in the previous example:
- if the counterparty field is empty, the
eventHandler
returns anack
, along with a suitable message. - if the counterparty field has content, then the
eventHandler
returns anack
Default reply types
So far, we have seen ack
and nack. There is a third reply type:
warningNack`. Let's stop and look at the specifications for all three default reply types:
ack
: used to signify a successful result.ack
takes an optional parameter ofList<Map<String, Any>>
. For example,ack(listOf(mapOf("TRADE_ID", "1")))
.nack
: used to signify an unsuccessful result.nack
accepts either aString
parameter or aThrowable
. For example,nack("Error!")
ornack(myThrowable)
.warningNack
: used to warn the client.warningNack
, likenack
, accepts either aString
parameter or aThrowable
. For example,warningNack("Provided User alias $userAlias will override Username $username.")
orwarningNack(myThrowable)
.
Transactional Event Handlers (ACID)
If you want your eventHandler
to comply with ACID, you need to use the RxEntityDb writeTransaction. Any exception or nack returned will result in a complete rollback of all parts of the onCommit
and onValidate
(the transaction also covers read commands) methods.
@Module
public class EventCounterParty implements Rx3EventHandler<Counterparty, EventReply> {
private final RxEntityDb entityDb;@Inject
public EventCounterParty(RxEntityDb entityDb) {
this.entityDb = entityDb;
}
@Nullable
@Override
public String messageType() {
return "COUNTERPARTY_INSERT";
}
@Override
public Single<EventReply> process(Event<Counterparty> counterpartyEvent) {
Counterparty counterparty = counterpartyEvent.getDetails();
return entityDb.writeTransaction(txn -> {
txn.insert(counterparty);
return ack(this);
}).map(result -> result.getFirst());
}
}
Context Event Handlers
In order to optimize database look-up operations, you might want to use data obtained by the onValidate
method inside your onCommit
method. To do this, implement the Rx3ContextValidatingEventHandler interface, as shown below:
@Module
public class ContextEvent implements Rx3ContextValidatingEventHandler<Company, EventReply, String> {
private final RxEntityDb entityDb;
@Inject
public ContextEvent(RxEntityDb entityDb) {
this.entityDb = entityDb;
}
@Nullable
@Override
public String messageType() {
return "CONTEXT_COMPANY_INSERT";
}
@NotNull
@Override
public Single<EventReply> onCommit(@NotNull Event<Company> event, @Nullable String context) {
String parsedContext;
parsedContext = Objects.requireNonNullElse(context, "Missing context");
Company company = event.getDetails();
return entityDb.insert(company).flatMap(result -> ack(this, List.of(Map.of("VALUE",parsedContext))));
}
@NotNull
@Override
public Single<ValidationResult<EventReply, String>> onValidate(@NotNull Event<Company> event) {
Company company = event.getDetails();
if(company.getCompanyName().equals("MY_COMPANY")) {
return ack(this).map(result -> validationResult(result, "Best company in the world"));
} else {
return ack(this).map(this::validationResult);
}
}
}