Update Queue
Overview
The update queue is part of the Genesis database layer and is the underlying technology that powers real-time updates.
Genesis provides an update queue so that processes can be kept up-to-date with the latest changes in the database, without needing to poll the database:
In the above example, the data_server
exposes the all_trades
resource. This should provide all trades within the application. When the data_server
starts, it will contact the db_server
and read all the trades from the database. At the same time, the data_server
also starts listening to the update queue.
When the event_handler
receives a trade insert event, it writes this to the db_server
database, using its local database connection (event_db
). After the update has been written, the process publishes the update on the update queue to be consumed by any subscribers.
As the data_server
is automatically subscribed to those updates from the update queue, it gets the updates without having to re-query the database.
There are three different update queue technologies to choose from, depending on your requirements:
- ZeroMQ (Default) - a decentralized peer-to-peer set-up, which relies on a fixed cluster size
- MQTT - use an independent MQTT broker if you want to allow dynamic scaling
- JMS - use an independent Jakarta Messaging broker (former JMS)
ZeroMQ
The Genesis Application Platform uses ZeroMQ out of the box to provide a zero-configuration decentralized peer-to-peer update queue.
It works because the GENESIS_CLUSTER process knows (via clustering) all the nodes within the cluster - so it can broadcast and consume appropriately.
- ZeroMQ is best suited for fixed-sized deployments.
- MQTT or JMS are recommended for set-ups that require dynamic scaling.
Proxy mode
All processes have a ZeroMQ port, which is their process port + 10000. Additionally, GENESIS_CLUSTER has two additional ports (ZeroMQProxyInboundPort
and ZeroMQProxyOutboundPort
) to act as proxy ports. This is necessary so that:
- scripts can publish to ZeroMQ without having an allocated port
- it can run in ZeroMQ “proxy” mode
The platform can be configured to run with or without proxy mode, and this setting applies to all processes.
When running under non-proxy mode, each process binds to its ZeroMQ port and all other processes need to connect to that port to receive updates. This mode isn't recommended for large apps, as 10 microservices will have 10 publishers - which will give us 100 connections.
When in non-proxy mode, all processes need to connect to the GENESIS_CLUSTER proxy ports, as scripts or other non-genesis processes could be publishing updates through those.
When running under proxy mode, each process connects to the ZeroMQProxyInboundPort
to publish data and subscribes to the ZeroMQProxyOutboundPort
to receive data, therefore reducing the total number of connections.
ZeroMQ configuration options
ZeroMQ is the default MQ system in Genesis, but it can be set explicitly with the following config:
systemDefinition {
global {
...
item(name = "MqLayer", value = "ZeroMQ")
...
}
}
Other config values that are available are listed below:
Config Item | Description | Default |
---|---|---|
ZeroMQProxyInboundPort (required) | The port used for ZeroMQ inbound connections | - |
ZeroMQProxyOutboundPort (required) | The port used for ZeroMQ outbound connections | - |
ZeroMQProxyModeEnabled | Enable proxy mode (recommended for large apps) | false |
ZeroMQProxyUnicastRelayEnabled | Creates a point-to-point connection between GENESIS_CLUSTER processes for all the nodes in the cluster. This connection is used to send local update queue events to the remote nodes in the cluster in order to ensure every node has access to all the updates | false |
ZeroMQConnectToLocalhostViaLoopback | Depending on the OS, routing may be subject to network flapping (quick changes in network routing) - this setting binds ZeroMQ communication to the local interface (localhost). Do not use in Production. | false |
If you are using a cloud environment that does not allow multicast traffic (e.g. AWS), then ZeroMQProxyModeEnabled
and ZeroMQProxyUnicastRelayEnabled
must be set to true
Using loopback (development environments)
If you are developing on a laptop, you might be affected by automatic switching between networks; for example, the operating system could suddenly switch to a hotspot. When this happens, the message queue could stop receiving updates. As a result, your processes could go down inexplicably.
To avoid this, set ZeroMQConnectToLocalhostViaLoopback = true
. This setting uses the local network stack to ensure that no messages are lost.
Never use this setting in a production environment.
MQTT
The Genesis Application Platform provides the option to use an external MQTT broker such as Mosquitto or RabbitMQ as the transport mechanism for the Genesis update queue.
Using a centralized external broker is highly recommended for complex, large or dynamically scaled clusters, as it reduces the complexity and overhead of peer-to-peer connectivity.
MQTT configuration options
To use MQTT in your Genesis application, you need to set the MqLayer
in the system definition.
systemDefinition {
global {
...
item(name = "MqLayer", value = "MQTT")
...
}
}
Other config values that are available are listed below:
Config Item | Description | Default |
---|---|---|
MqttBrokerUrl | The URL of the MQTT broker | tcp://localhost:1883 |
MqttQos | The MQTT Quality of Service level At most once (0) At least once (1) Exactly once (2) | 2 |
MqttClientId | A template pattern for the client ID using HOSTNAME and PROCESS_NAME | genesis/{{HOSTNAME}}/{{PROCESS_NAME}} |
MqttQueueNamePattern | A template pattern for the queue name using TABLE_NAME | genesis/database/{{TABLE_NAME}} |
MqttThreadPoolSize | Number of threads to use | 4 |
MqttUsername | MQTT Username | null |
MqttPassword | MQTT Password | null |
MqttTlsVerify | Setting to false ignores certificate verification. This should only be set to false in a dev or test environment. | true |
JMS
The Genesis platform enables you to use a Jakarta Messaging (former JMS) compliant message broker as its real-time update-queue back-end via ArtemisMQ.
Using a centralized external broker is highly recommended for complex, large or dynamically scaled clusters, as it reduces the complexity and overhead of peer-to-peer connectivity at the cost of maintaining or (re-using) a message broker.
Prerequisites
To use Genesis Jakarta Messaging on the platform, you must install ArtemisMQ in the host (or hosts in case of High Availability). This is described in the ArtemisMQ documentation.
Genesis does not ship Artemis MQ server, just its client library.
JMS configuration options
Basic configuration
To use Jakarta Messaging in your Genesis application, set the MqLayer
in your application's system definition file.
systemDefinition {
global {
...
item(name = "MqLayer", value = "JMS")
...
}
}
The Config variables are listed below.
Config Item | Description | Default |
---|---|---|
JmsHost (required) | The host name, can hold multiple values separated by comma; this enables you to use HA failover | http://localhost:61616 |
JmsUsername | Username to connect | |
JmsPassword | Password to connect (can be encrypted using GenesisKey ) | |
JmsTopicSubscriptionMode | Enum value (TRANSIENT, PERSISTENT) | TRANSIENT |
JmsMaxConnectionRetryOnStartup | The number of connections the process will attempt to make to the broker before declaring the process DOWN | 20 |
JmsReconnectDelayMs | The amount of time in milliseconds (exponentially) for a connection reattempt to be made, in case of connectivity problems or other faults | 1000 |
JmsRetryToSendDelayMs | The amount of time in milliseconds for retrying to send a message in case of failure. | 1000 |
Durable consumers
You can configure specific processes to receive messages that were sent when the process was offline. This offers a useful method for processes to recover from missing messages without the need to replay an entire data stream.
For example, assume you have an application named alpha
and you are going to set up ALPHA_CONSOLIDATOR
to be a durable consumer of the update queue.
- Create a configuration file for the
ALPHA_CONSOLIDATOR
process with the contents:
process {
systemDefinition {
item(name = "JmsTopicSubscriptionMode", value = "PERSISTENT")
}
}
- Add the details of the newly created configuration to your application's
processes.xml
file:
<process name="ALPHA_CONSOLIDATOR">
<groupId>ALPHA</groupId>
<start>true</start>
<options>-Xmx256m -DRedirectStreamsToLog=true -DXSD_VALIDATE=false -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5106</options>
<module>genesis-pal-consolidator</module>
<package>global.genesis.pal.consolidator</package>
<script>alpha-consolidator.kts</script>
<config>alpha-consolidator-process-config.kts</config>
<language>pal</language>
<primaryOnly>true</primaryOnly>
</process>
- Perform
genesisInstall
to apply the configuration and start the processes normally.
Do not define JmsTopicSubscriptionMode
at Global level, as this will cause all messages to be stored and delivered once the process is back online.
High availability
Prerequisites
The quickest way to set up High Availability with ArtemisMQ is:
- Use a shared store.
- Use discovery via static connectors.
This method is tested and supported by Genesis. There are many other ways of implementing High Availability, but if you implement a different solution, it is vital that you test it thoroughly.
How it works
The diagram below represents using a two-node cluster to show how the Genesis processes behave in this scenario.
- All processes connect to the primary broker:
- If there is a failure on the primary broker, the back-up turn primary and all clients automatically re-establish a connection to it:
During the failover period, the processes will be reported as DOWN.
Troubleshooting
Process reports DOWN
Scenario: The mon
command displays the following message for one or more Genesis processes: UpdateQueue: Update queue is not connected.
There are two possible causes for this:
- The process could not connect in the first place.
- The connection to the message broker has been lost.
At initialisation, processes are terminated automatically after the retry attempts are exhausted. However, if the connection is lost after the initialisation occurs, the processes are not shut down automatically; manual intervention is required from application support.
Resolution: Ensure that the message broker is online and accepting connections. Make sure that the configuration in the genesis-system-definition.kts
has the correct values.
Process no longer exists but there are obsolete queues in the message broker
Scenario: You have set up a durable consumer, but this process no longer exists in your application; you can see Artemis Console queues with messages that are never going to be consumed.
Resolution: Remove the queue from the Artemis MQ management console or CLI.