OTE app: improving the back end
Here, we shall look into the back-end code and make some useful changes to increase the usability and range of the app.
Adding a query
Now add a new query to your Data Server to make this information available to the front end.
-
Go to the folder
OTE\server\OTE-app\src\main\genesis\scripts
. -
Go to the file
OTE\server\OTE-app\src\main\genesis\scripts\OTE-dataserver.kts
and add the following code [Need to specify where] to define the new query.
This can also be found in a TODO block.
query("ALL_USER_NAMES", USER) {
fields {
USER_NAME
}
}
Updating the consolidators
The code changes listed can also be found in TODO blocks.
-
Go to the file
OTE\server\OTE-app\src\main\genesis\scripts\OTE-consolidator.kts
. -
In the
SELL_PARTICIPANT_POSITION_AGG consolidator
, findsum { quantity } into QUANTITY
. Update it as follows:
sum { quantity * -1 } into QUANTITY
Modify the CASH_BALANCE_AGG consolidator so that the new aggregations are included as a result of the primary key changes that were made.
Aggregate buy and sell in the consolidators
To ensure that Buy and Sell data is aggregated and captured for the correct side in the OPEN_POSITION_AGG
consolidator:
- Add the following code to the imports section:
import global.genesis.gen.dao.enums.OTE.passive_order_book.Side
import global.genesis.gen.dao.enums.OTE.passive_order_book.Status
- Find
sum { openQuantity } into BUY_QUANTITY
andsum { openQuantity } into SELL_QUANTITY
. Update as follows:
sum { openQuantity } onlyIf { side == Side.Buy } into BUY_QUANTITY
sum { openQuantity } onlyIf { side == Side.Sell } into SELL_QUANTITY
- Find
groupBy
and add the following clause above this label:
where {
status == Status.Active
}
Create a new passive order book eventHandler
To handle the Passive Order Book correctly, we are going to do two things:
- create a new
-eventhandler.kts
file with the relevant logic for the Passive Order Book; we provide a large code snippet here that contains the logic - this makes the
PASSIVE_ORDER_BOOK_INSERT
,PASSIVE_ORDER_BOOK_MODIFY
andPASSIVE_ORDER_BOOK_DELETE
events in theOTE-eventhandler.kts
file unnecessary, so you need to remove them.
To do this:
-
Go to the folder
OTE\server\OTE-app\src\main\genesis\scripts
. Add a file calledOTE-pob-eventhandler.kts
. -
Add this newly created file to the OTE_MANAGER. To do this, go to the file
OTE\server\OTE-app\src\main\genesis\cfg\OTE-processes.xml
.
In this file, find <script>OTE-eventhandler.kts,OTE-reqrep.kts,OTE-dataserver.kts</script>
. Replace this with the code below:
<script>OTE-eventhandler.kts,OTE-pob-eventhandler.kts,OTE-reqrep.kts,OTE-dataserver.kts</script>
- Go to file
OTE\server\OTE-app\src\main\genesis\scripts\OTE-eventhandler.kts
and remove the following code:
eventHandler<PassiveOrderBook>("PASSIVE_ORDER_BOOK_INSERT", transactional = true) {
permissioning {
permissionCodes = listOf("PassiveOrderBookUpdate")
}
onCommit { event ->
val details = event.details
val insertedRow = entityDb.insert(details)
// return an ack response which contains a list of record IDs
ack(listOf(mapOf(
"ORDER_ID" to insertedRow.record.orderId,
)))
}
}
eventHandler<PassiveOrderBook>("PASSIVE_ORDER_BOOK_MODIFY", transactional = true) {
permissioning {
permissionCodes = listOf("PassiveOrderBookUpdate")
}
onCommit { event ->
val details = event.details
entityDb.modify(details)
ack()
}
}
eventHandler<PassiveOrderBook.ByOrderId>("PASSIVE_ORDER_BOOK_DELETE", transactional = true) {
permissioning {
permissionCodes = listOf("PassiveOrderBookUpdate")
}
onCommit { event ->
val details = event.details
entityDb.delete(details)
ack()
}
}
- Go to the file
OTE\server\OTE-app\src\main\genesis\scripts\OTE-pob-eventhandler.kts
. Add the following large code snippet:
import global.genesis.gen.dao.enums.OTE.cash_movement.Reason
import global.genesis.gen.dao.enums.OTE.instrument.Type
import global.genesis.gen.dao.enums.OTE.passive_order_book.Side
import global.genesis.gen.dao.enums.OTE.passive_order_book.Status
import global.genesis.gen.dao.enums.OTE.process_log.Process
//TODO - consider if we need an archive table for performance reasons - once a trade is withdrawn, expired or
// filled it could move to archive (or this could be done via a scheduled job every x hours / minutes etc)
eventHandler {
//This event runs a closing price process that calculates a weighted average price from the trades booked in a
//given time period prior to the process
//There could be a number of ways of doing this depending on the requirements of the end user
eventHandler<Unit>("RUN_CLOSING_PRICES") {
onCommit {
//Currently this uses the time the event is run to set a range for weighted average price calculations
//This will usually be more specific and based on actual trading times (such as last 30 minutes of the
//exchange operating time)
val priceCheckStartHour = now().hourOfDay - 1
val priceCheckEndHour = now().hourOfDay + 1
LOG.info("Closing prices will be calculated form trades between the hours: $priceCheckStartHour and $priceCheckEndHour")
//Get all instruments
entityDb.getBulk(INSTRUMENT).toList().forEach {
val trades = entityDb.getRange(Trade.byInstrumentIdTradeDatetime(it.instrumentId, now().withTimeAtStartOfDay().plusHours(priceCheckStartHour)),Trade.byInstrumentIdTradeDatetime(it.instrumentId, now().withTimeAtStartOfDay().plusHours(priceCheckEndHour)))
.toList()
var weightedPrice = 0.0
var quantity = 0.0
//Look for trades against the instrument
trades.forEach {
weightedPrice += it.price * it.quantity
quantity += it.quantity
}
//Using upsert so a rerun updates and replaces existing prices
entityDb.upsert(
ClosingPrice (
closingDate = now().withTimeAtStartOfDay(),
instrumentId = it.instrumentId,
price = weightedPrice / quantity
)
)
}
//Log the process has run
entityDb.insert(
ProcessLog(
process = Process.ClosingPrices,
ranAt = now()
)
)
LOG.info("Have run closing prices")
ack()
}
}
//This process takes the most recent closing prices and triggers cash movements to reflect a variation margin
//based upon the movement seen in the market. This assumes a 100% variation call after a closing price is
//run. This is only done for Type == Future. This function could vary by end user requirement
eventHandler<Unit>("RUN_VARIATION_MARGIN") {
onValidate { event ->
val lastClosingPriceRun = entityDb.getBulk(CLOSING_PRICE)
require(lastClosingPriceRun.count() != 0) {"No closing prices - run closing prices first!"}
ack()
}
onCommit {
//Get last closing position run datetime
val lastEntry = entityDb.getBulk(CLOSING_POSITION).toList().maxByOrNull { it.ranAt }
var lastRun = DateTime.parse("2000-01-01")
if (lastEntry != null) {
lastRun = lastEntry.ranAt
}
val runUntil = now()
LOG.info("Running variation margin calls between $lastRun and $runUntil")
//Get last closing price run time
var lastClosingPriceRun = entityDb.getBulk(CLOSING_PRICE).toList().maxByOrNull { it.closingDate }
var lastClosingPriceDate = DateTime.parse("2000-01-01")
if (lastClosingPriceRun != null) {
lastClosingPriceDate = lastClosingPriceRun.closingDate
}
//Get last closing prices
val closingPriceMap = mutableMapOf<Long, Double>()
entityDb.getRange(ClosingPrice.byClosingDate(lastClosingPriceDate))
.toList()
.forEach {
closingPriceMap[it.instrumentId] = it.price
}
//Get instrument currencies
val instrumentCurrency = mutableMapOf<Long, String>()
entityDb.getBulk(INSTRUMENT)
.toList()
.forEach {
instrumentCurrency[it.instrumentId] = it.currency
}
//Get instrument types
val instrumentType = mutableMapOf<Long, Type>()
entityDb.getBulk(INSTRUMENT)
.toList()
.forEach {
instrumentType[it.instrumentId] = it.type
}
//Create map of new closing positions
var participantInstrumentMap = mutableMapOf<String, ClosingPosition> ()
//.Add trade data
entityDb.getRange(Trade.byTradeDatetime(lastRun), Trade.byTradeDatetime(runUntil))
.toList()
.filter { instrumentType[it.instrumentId] == Type.Future }
.forEach {
//Buy side
var key = it.buyParticipantName + "|" + it.instrumentId
val closingPrice = closingPriceMap[it.instrumentId]?: 0.0
val priceMove = closingPrice - it.price
if (participantInstrumentMap.containsKey(key) == false) {
participantInstrumentMap[key] = ClosingPosition(
closingDate = runUntil,
ranAt = runUntil,
participantName = it.buyParticipantName,
instrumentId = it.instrumentId,
quantity = it.quantity,
price = closingPrice,
)
}
else{
participantInstrumentMap[key]!!.quantity += it.quantity
}
//Do variation margin movement
var movement = it.quantity * priceMove * -1
if(movement != 0.0) {
entityDb.insert(
CashMovement(
amount = movement,
currency = instrumentCurrency[it.instrumentId]!!,
movementDate = now(),
participantName = it.buyParticipantName,
reason = Reason.VariationMargin,
tradeId = it.tradeId
)
)
}
//Sell side
key = it.sellParticipantName + "|" + it.instrumentId
if (participantInstrumentMap.containsKey(key) == false) {
participantInstrumentMap[key] = ClosingPosition(
closingDate = runUntil,
ranAt = runUntil,
participantName = it.sellParticipantName,
instrumentId = it.instrumentId,
quantity = it.quantity * -1,
price = closingPrice,
)
}
else{
participantInstrumentMap[key]!!.quantity -= it.quantity
}
//Do variation margin movement
movement = it.quantity * priceMove
if(movement != 0.0) {
entityDb.insert(
CashMovement(
amount = movement,
currency = instrumentCurrency[it.instrumentId]!!,
movementDate = now(),
participantName = it.sellParticipantName,
reason = Reason.VariationMargin,
tradeId = it.tradeId
)
)
}
}
//Add prior closing data
entityDb.getRange(ClosingPosition.byRanAt(lastRun))
.toList()
.filter { instrumentType[it.instrumentId] == Type.Future }
.forEach {
var key = it.participantName + "|" + it.instrumentId
val closingPrice = closingPriceMap[it.instrumentId] ?: 0.0
val priceMove = closingPrice - it.price
if (participantInstrumentMap.containsKey(key) == false) {
participantInstrumentMap[key] = ClosingPosition(
closingDate = runUntil,
ranAt = runUntil,
participantName = it.participantName,
instrumentId = it.instrumentId,
quantity = it.quantity,
price = closingPrice,
)
} else {
participantInstrumentMap[key]!!.quantity += it.quantity
}
//Do variation margin movement
val movement = it.quantity * priceMove
if(movement != 0.0) {
entityDb.insert(
CashMovement(
amount = movement,
currency = instrumentCurrency[it.instrumentId]!!,
movementDate = now(),
participantName = it.participantName,
reason = Reason.VariationMargin
)
)
}
}
//Save CLosingPosition where quantity is non-zero
participantInstrumentMap.forEach {
if(it.value.quantity != 0.toLong()) {
entityDb.insert(it.value)
}
}
//Log the process has run
entityDb.insert(
ProcessLog(
process = Process.VariationMargin,
ranAt = runUntil
)
)
LOG.info("Have run variation margin calls")
ack()
}
}
//This event runs a process to expire any orders where the expiry date is prior to now
//In the order insert, this is checked again anyway, but in production this process should run on a sensible
//schedule to ensure orders expire in a timely fashion
//Currently called from a front end button, but could easily be a CRON_RULE triggering this
eventHandler<Unit>("EXPIRE_ORDERS") {
onCommit {
entityDb.getRange(PassiveOrderBook.byExpiry(DateTime.parse("2000-01-01")), PassiveOrderBook.byExpiry(now()))
.toList().filter { it.status == Status.Active }.forEach {
it.status = Status.Expired
entityDb.modify(it)
//Move initial margin back with a cash movement
val instrument = entityDb.get(Instrument.byId(it.instrumentId))
entityDb.insert(
CashMovement(
participantName = it.participantName!!,
movementDate = now(),
reason = Reason.InitialMargin,
currency = instrument!!.currency,
amount = (it.openQuantity?:0.0).toDouble() * it.price * instrument.initialMarginPercent * (if (it.side == Side.Sell) -1 else 1),
orderId = it.orderId
)
)
}
//Log the process has run
entityDb.insert(
ProcessLog(
process = Process.OrderExpiry,
ranAt = now()
)
)
LOG.info("Have run expiry process")
ack()
}
}
//This function inserts new orders. As it does so it:
//1. Cycles through existing orders (of opposite side) where instrument is the same
//2. If order has not already expired (ideally will have moved to "expired" state, but chance for race condition)
//3. It then checks whether the price of the open order is favourable
// if a buy, is sell price offered lower or equal
// if a sell, is buy price offered higher or equal
//4. If so it hits the order at the open order price, not the new order price
//5. Then adjust the open order to reduce the open position accordingly
//6. Depending on size of new order more than one open order might need to be hit
//Then it proceeds to book trades and make cash movements according to the new trade booked
//Finally it takes an initial margin call on the placed order at any remaining open quantity
//
//Note this uses a "contextEventHandler" as this allows the passing of an object between the onValidate and onCommit
//in this case the instrument, thus saving a duplicate database look up
contextEventHandler<PassiveOrderBook, Instrument>("PASSIVE_ORDER_BOOK_INSERT", transactional = true) {
permissioning {
permissionCodes = listOf("PassiveOrderBookUpdate")
}
onValidate { event ->
val instrument = entityDb.get(Instrument.byId(event.details.instrumentId))
require(instrument != null) {
"Instrument ${event.details.instrumentId} does not exist"
}
val availableBalance =
entityDb.get(CashBalance.byParticipantNameCurrency(event.userName, instrument.currency))?.balance
require(
(availableBalance ?: 0.0) > (event.details.price * event.details.quantity * instrument.initialMarginPercent)
) { "Insufficient Balance Available! You need: ${event.details.price * event.details.quantity * instrument.initialMarginPercent}" }
validationAck(validationContext = instrument)
}
onCommit { event, instrument ->
val newOrder = event.details
newOrder.openQuantity = newOrder.quantity
newOrder.entryDatetime = now()
newOrder.participantName = event.userName
val reason = if (instrument!!.type == Type.Future) Reason.InitialMargin else Reason.Settlement
val currency = instrument.currency
val tradesToInsert = mutableListOf<Trade>()
val cashMovementToInsert = mutableListOf<CashMovement>()
val ordersToUpsert = mutableListOf<PassiveOrderBook>()
if (newOrder.side == Side.Buy) {
entityDb.getRange(PassiveOrderBook.byInstrumentIdSide(newOrder.instrumentId, Side.Sell))
.filter { openOrder -> newOrder.price >= openOrder.price && openOrder.status != Status.Filled && openOrder.expiry >= now() }
.toList()
.sortedWith(compareBy({ it.price }, { it.entryDatetime }))
.asSequence()
.takeWhile { newOrder.status != Status.Filled } // Ensure we stop processing items if the order has been filled.
.forEach { openOrder ->
var tradeQuantity: Long = 0
if (openOrder.openQuantity!! >= newOrder.openQuantity!!) {
tradeQuantity = newOrder.openQuantity!!
openOrder.openQuantity = openOrder.openQuantity!! - tradeQuantity
newOrder.openQuantity = 0
newOrder.status = Status.Filled
} else {
tradeQuantity = openOrder.openQuantity!!
newOrder.openQuantity = newOrder.openQuantity!! - tradeQuantity
openOrder.openQuantity = 0
openOrder.status = Status.Filled
}
val initialMargin = tradeQuantity * openOrder.price * instrument.initialMarginPercent
val newTrade = Trade(
buyParticipantName = event.userName,
sellParticipantName = openOrder.participantName!!,
instrumentId = openOrder.instrumentId,
price = openOrder.price,
quantity = tradeQuantity,
tradeDatetime = now(),
initialMargin = initialMargin
)
tradesToInsert.add(newTrade)
//Only need initial margin for the buy - the open order is already taken
val cashMovement = CashMovement(
participantName = newTrade.buyParticipantName,
movementDate = now(),
reason = reason,
currency = currency,
amount = initialMargin * -1.0,
)
cashMovementToInsert.add(cashMovement)
ordersToUpsert.add(openOrder)
}
} else {
entityDb.getRange(PassiveOrderBook.byInstrumentIdSide(newOrder.instrumentId, Side.Buy))
.filter { openOrder -> newOrder.price <= openOrder.price && openOrder.status != Status.Filled && openOrder.expiry >= now() }
.toList()
.sortedWith(compareByDescending<PassiveOrderBook> { it.price }.thenBy { it.entryDatetime })
.asSequence()
.takeWhile { newOrder.status != Status.Filled } // Ensure we stop processing items if the order has been filled.
.forEach { openOrder ->
var tradeQuantity: Long = 0
if (openOrder.openQuantity!! >= newOrder.openQuantity!!) {
tradeQuantity = newOrder.openQuantity!!
openOrder.openQuantity = openOrder.openQuantity!! - tradeQuantity
newOrder.openQuantity = 0
newOrder.status = Status.Filled
} else {
tradeQuantity = openOrder.openQuantity!!
newOrder.openQuantity = newOrder.openQuantity!! - tradeQuantity
openOrder.openQuantity = 0
openOrder.status = Status.Filled
}
val initialMargin = tradeQuantity * openOrder.price * instrument.initialMarginPercent
val newTrade = Trade(
buyParticipantName = openOrder.participantName!!,
sellParticipantName = event.userName,
instrumentId = openOrder.instrumentId,
price = openOrder.price,
quantity = tradeQuantity,
tradeDatetime = now(),
initialMargin = initialMargin
)
tradesToInsert.add(newTrade)
//Only need initial margin for the sell - the open order is already taken
val cashMovement = CashMovement(
participantName = newTrade.sellParticipantName,
movementDate = now(),
reason = reason,
currency = currency,
amount = initialMargin
)
cashMovementToInsert.add(cashMovement)
ordersToUpsert.add(openOrder)
}
}
// Add new order to list of orders to update.
ordersToUpsert.add(newOrder)
// Perform order changes
val ordersUpsertResult = entityDb.upsertAll(ordersToUpsert.map { EntityModifyDetails(it) })
// Retrieve newOrder orderId
val newOrderId = ordersUpsertResult.last().record.orderId
// Insert trades
val tradesInsertResult = entityDb.insertAll(tradesToInsert)
// Update cash movements with trade ids
tradesInsertResult.forEachIndexed { index, insertResult ->
cashMovementToInsert[index].tradeId = insertResult.record.tradeId
}
//Now take an initial marking for the remaining portion of the new order
if ((newOrder.openQuantity ?: 0) > 0) {
val initialMargin = newOrder.openQuantity!! * newOrder.price * instrument.initialMarginPercent
// Set last order id in a new cashMovement record if needed.
cashMovementToInsert.add(
CashMovement(
participantName = event.userName,
movementDate = now(),
reason = reason,
currency = currency,
amount = initialMargin,
orderId = newOrderId
)
)
}
// Insert all cash movements in one go
entityDb.insertAll(cashMovementToInsert)
// return an ack response which contains a list of record IDs
ack(
listOf(
mapOf(
"ORDER_ID" to newOrderId,
)
)
)
}
}
//The below allows the withdrawal of an order, but only by the person who submitted it. It also reverses any
//remaining initial margin movement
//Note this uses a "contextEventHandler" as this allows the passing of an object between the onValidate and onCommit
//in this case the instrument, thus saving a duplicate database look up
contextEventHandler<PassiveOrderBook, Instrument>("PASSIVE_ORDER_BOOK_DELETE", transactional = true) {
permissioning {
permissionCodes = listOf("PassiveOrderBookUpdate")
}
onValidate { event ->
require(event.userName == event.details.participantName) { "You are only able to withdraw your own orders!" }
require(event.details.status == Status.Active) { "This order is not active!" }
val instrument = entityDb.get(Instrument.byId(event.details.instrumentId))
require(instrument != null) { "Instrument ${event.details.instrumentId} does not exist" }
validationAck(validationContext = instrument)
}
onCommit { event, instrument ->
val details = event.details
//Set trade to withdrawn & increment version number
details.status = Status.Withdrawn
details.version = details.version + 1
entityDb.modify(details)
//Move initial margin back with a cash movement
entityDb.insert(
CashMovement(
participantName = event.userName, //Already checked it is the same
movementDate = now(),
reason = Reason.InitialMargin,
currency = instrument!!.currency,
amount = (details.openQuantity?:0.0).toDouble() * details.price * instrument.initialMarginPercent * (if (details.side == Side.Sell) -1 else 1),
orderId = details.orderId
)
)
ack()
}
}
}
That's it. Once your new file has this code, you have successfully set the logic for handling the passive order book.
This large code snippet has comments throughout to indicate the purpose of each codeblock.