Skip to main content

Notify - Data Pipelines

There are some pipeline implementations provided with the Notifications component that can be used within any data pipeline you have set up. See Data Pipelines for more information.

Set Up

In order to have access to the following out of box GPAL extension functions you will need to add the following dependency to the app module of your project:

implementation("global.genesis:genesis-notify-pipelines:$notifyVersion")

Notify Email On Completion Handler

Handler that sends an email upon successful pipeline operation.

GPAL Example:

pipeline("TEST_ON_COMPLETION_PIPELINE") {
source(dbBulkRealtimeSource)
.map(mapper)
.sink(logSink)
.onCompletion(
notifyEmailOnCompletion {
body = "Trade with id ${context.data.tradeId} has been processed"
header = "Trade Processed"
recipients = setOf(EmailRecipient("john.doe@genesis.global", EmailRecipientType.TO), EmailRecipient("jane.doe@genesis.global", EmailRecipientType.CC))
}
)
}

Notify All Screens On Completion Handler

Handler that sends a notification to all screens upon successful pipeline operation.

GPAL Example:

pipeline("TEST_ON_COMPLETION_PIPELINE") {
source(dbBulkRealtimeSource)
.map(mapper)
.sink(logSink)
.onCompletion(
notifyAllScreensOnCompletion {
body = "Trade with id ${context.data.tradeId} has been processed"
header = "Trade Processed"
severity = Severity.Information
}
)
}

Notify Screen on Completion Handler

Handler that sends a notification to the screen of a specified user upon successful pipeline operation.

GPAL Example:

pipeline("TEST_ON_COMPLETION_PIPELINE") {
source(dbBulkRealtimeSource)
.map(mapper)
.sink(logSink)
.onCompletion(
notifyScreenOnCompletion {
body = "Trade with id ${context.data.tradeId} has been processed"
header = "Trade Processed"
severity = Severity.Information
user = "JohnDoe"
}
)
}

Notify Screen on Error Handler

Handler that sends a notification to the screen of a specified user upon uncaught error within a pipeline operation.

GPAL Example:

pipeline("TEST_ON_COMPLETION_PIPELINE") {
source(dbBulkRealtimeSource)
.map(mapper)
.sink(logSink)
.onUncaughtError(
notifyScreenOnError {
body = "Error was thrown while running TEST_ON_COMPLETION_PIPELINE: ${this.error.message}"
header = "Trade Processed"
severity = Severity.Critical
user = "JohnDoe"
}
)
}