Skip to main content

Direct connection API

In addition to repository modules, you can interact directly with SQL connections via SqlDatabaseConnection (blocking) or SqlAsyncDatabaseConnection (suspending). This guide covers how to obtain a connection and how to use it.

Obtaining a connection

To use SQL integration you will need a named database connection. When connecting to a SQL database, Genesis will expose its connection with name genesis. To connect to another - external - database, specify the jdbc url as a system definition in the follow format: sql.<connection name>.jdbcUrl. For example:

item("sql.my-connection.jdbcUrl", "jdbc:h2:mem:myDb;DB_CLOSE_DELAY=-1;NON_KEYWORDS=KEY,VALUE")

To interact with a connection, inject a SqlDatabaseConnection (or SqlAsyncDatabaseConnection) using a @Named annotation with the connection name. For example: @Named("my-connection"). The underlying code allocates connections from a pool only when a query is evaluated or a prepared statement is created; that pool is driven by the Configuration. When using the genesis connection, the connection pool is shared.


Relation to standard JDBC

Please note that these connections are not a direct equivalent of a Connection in JDBC terms. SqlDatabaseConnection and SqlAsyncDatabaseConnection operate at a different level: they are a way to run queries and execute statements on the database, and they do not expose the raw JDBC API.

  • No raw JDBC — You do not get createStatement(), prepareStatement(String), or direct access to the underlying java.sql.Connection. Use the connection’s query(), update(), execute(), and preparedStatement() / builder APIs instead.
  • Named parameters — SQL uses named placeholders (e.g. :id, :status) and a Map<String, Any?> for parameters, rather than positional ? and index-based binding.
  • Structured operations — Clear split between queries (SELECT → ExecutableQuery), updates (INSERT/UPDATE/DELETE → row count), and DDL/other execution; plus optional metric ids and connection naming for observability.
  • Mapping — Built-in mapping from result sets to classes (and custom mappers); you typically work with domain types or primitives rather than ResultSet directly.

Connection allocation. The underlying code allocates a connection from a pool only when:

  • A query is evaluated (when you call a terminating method on ExecutableQuery or otherwise run the query), or
  • A prepared statement is created.

Those connections come from a connection pool driven by the Configuration. When using the genesis connection, the connection pool is shared.


Simple examples

In the examples below, Kotlin uses SqlAsyncDatabaseConnection (suspend functions; call from a coroutine). Java uses SqlDatabaseConnection (blocking). Method names therefore differ where applicable: e.g. Kotlin execute() / build() vs Java blockingExecute() / blockingBuild().

Query (SELECT)

// Direct: query with params, collect to list
val activeUsersQuery: ExecutableQuery<User> = connection.query<User>(
"SELECT * FROM users WHERE status = :status",
mapOf("status" to "ACTIVE")
)

val users = activeUsersQuery.asFlow().toList()

// Builder: param and metricId, then firstOrNull
val userByIdQuery: ExecutableQuery<User> = connection.queryBuilder("SELECT * FROM users WHERE id = :id", User::class.java)
.param("id", userId)
.metricId("get_user_by_id")
.build()
val user: User? = userByIdQuery.firstOrNull()

Update (INSERT / UPDATE / DELETE)

When using SqlAsyncDatabaseConnection, update() and builder execute() are suspending:

val rowsUpdated: Int = connection.update(
namedQuery = "UPDATE users SET status = :status WHERE id = :id",
params = mapOf("status" to "ACTIVE", "id" to userId),
metricId = "activate_user"
)

// Or with the builder (execute() is suspending)
val rows: Int = connection.updateBuilder("INSERT INTO audit_log (user_id, action) VALUES (:userId, :action)")
.param("userId", userId)
.param("action", "LOGIN")
.metricId("insert_audit")
.execute()

Execute (DDL / stored procedures)

When using SqlAsyncDatabaseConnection, execute() is a suspend function.

val outcome: SqlExecutionOutcome = connection.execute(
"CREATE INDEX idx_user_email ON users(email)",
metricId = "create_email_index"
)

// With parameters
val result: SqlExecutionOutcome = connection.execute(
namedQuery = "CALL update_statistics(:tableName)",
params = mapOf("tableName" to "users"),
metricId = "update_stats"
)

Prepared statement (reusable, generated keys)

Please note that SqlDatabasePreparedStatement will be pinned to a single JDBC connection, ensure that you call close() to prevent a resource leak. When using SqlAsyncDatabaseConnection, buildStatement(...) and preparedStatement(sql).build() are suspending.

val ordersStatement: SqlDatabasePreparedStatement = connection.buildStatement(
namedQuery = "INSERT INTO orders (user_id, total) VALUES (:userId, :total)",
generatedFields = listOf("order_id", "created_at"),
metricId = "insert_order"
)

// use ensures we release the underlying connection
ordersStatement.use {
it.setParameters(mapOf("userId" to 123, "total" to 99.99))
it.executeUpdate()
val keyRow: YourKeyType = it.getGeneratedKeys(orderIdMapper).first() // mapper to your key DTO/type
}

// Or with the builder (build() is suspending)
val usersStatement: SqlDatabasePreparedStatement = connection.preparedStatement("INSERT INTO users (name) VALUES (:name)")
.returnGeneratedKeys()
.metricId("insert_user")
.build()

// use ensures we release the underlying connection
usersStatement.use {
it.setParameters(mapOf("name" to "Alice"))
it.executeUpdate()
}

Concepts

Blocking vs suspending

  • Blocking (SqlDatabaseConnection): Methods run on the calling thread and block until the database responds. Use in Java or in Kotlin when blocking is acceptable (e.g. non-coroutine code).
  • Suspending (SqlAsyncDatabaseConnection): Methods are suspend functions; they do not block the thread and are intended for Kotlin coroutines. Use in Kotlin when you want non-blocking I/O and structured concurrency.

Both interfaces extend the same base (SqlBaseDatabaseConnection), so query, queryBuilder, updateBuilder, executeBuilder, and preparedStatement are shared. Only execute, update, and the creation of prepared statements differ: blocking connection exposes blocking methods and createPreparedStatement; async connection exposes suspend versions and buildStatement / preparedStatement(...).build().

Named parameters

SQL uses placeholders like :paramName. Parameters are passed as Map<String, Any?> (e.g. mapOf("paramName" to value)). Keys must match the placeholder names without the colon. For supported value types, see Types.

Types

In SQL, Genesis supports different kinds of types for both parameters and return values:

  • Primitive types — natively supported in JDBC; can be used as parameters or as query result types.
  • Kotlin data classes and Java record classes — supported as long as they only contain primitive types (or other such data/record types).
  • Custom mapping — any other type via a custom ResultSetMapper.

Primitives

Primitives are natively supported in JDBC and can be used as either parameters or return types. The following are supported natively:

  • String
  • Boolean
  • Byte
  • Short
  • Int
  • Long
  • Float
  • Double
  • BigDecimal
  • ByteArray
  • Date (java.util.Date)
  • Timestamp

Kotlin data classes and Java record classes

Kotlin data classes and Java record classes are supported as row-mapped result types (or as parameter values), provided they only contain primitive types as listed above.

Using a custom mapper

To map any other class, implement ResultSetMapper<RESULT> and pass it into the query or queryBuilder (e.g. query(namedQuery, rowMapper, params, metricId)). Alternatively, annotate your class with @RowMapper to register a mapper for that type.

@RowMapper(TradeMapper::class)
class Trade {
// ...
}

class TradeMapper : ResultSetMapper<Trade> {
override fun map(rs: ResultSet): Trade {
return Trade(
id = rs.getString("id"),
quantity = rs.getInt("quantity"),
price = rs.getBigDecimal("price"),
status = rs.getString("status"),
clientId = rs.getString("clientId"),
)
}
}

metricId

Optional string used for observability (logging, metrics). When null, the implementation typically uses the SQL string as the identifier. Pass a short, stable id (e.g. "get_user_by_id") for clearer metrics.

Row mapping (query results)

  • By class: Use query(namedQuery, clazz, params) or queryBuilder(sql, clazz). Rows are mapped to the given class (e.g. User.class / User::class.java) using convention-based mapping. Supported result types are primitives, Kotlin data classes and Java records that only contain those types, or any type with a custom mapper.
  • By custom mapper: Use query(namedQuery, rowMapper, params) or queryBuilder(sql, rowMapper) with a ResultSetMapper<RESULT> to define how each row becomes an object.
  • Kotlin reified: Use the extension connection.query<User>(namedQuery, params, metricId) so the result type is inferred.

ExecutableQuery (lazy execution)

query() and queryBuilder().build() return an ExecutableQuery<RESULT>. The query is not run until you call a terminating method, e.g.:

  • toList(), first(), firstOrNull(), last()
  • collect { }, fold(...), reduce(...)
  • asFlow() (then consume the Flow)

You can chain non-terminating operations (take(n), map { }, filter { }, onEach { }) before a terminating call; execution still happens only at the terminating step.

Consuming query results: blocking vs Flow

  • Blocking: Call toList(), first(), firstOrNull(), collect { }, etc. These methods block the calling thread until the query finishes. They work with both blocking and async connections; with an async connection, the suspension point is in the connection layer, but the terminating call itself is still blocking.
  • Non-blocking streaming: Call asFlow() and collect the Flow inside a coroutine (e.g. query.asFlow().collect { } or query.asFlow().toList() in a suspend function). Use when you want backpressure or to process rows as they arrive.

Builders vs direct methods

  • Direct: execute(...), update(...), query(...), createPreparedStatement(...) / buildStatement(...) take all arguments in one call.
  • Builders: executeBuilder(sql), updateBuilder(sql), queryBuilder(sql, ...), preparedStatement(sql) return a builder on which you set params, metricId, etc., then call:
  • Blocking: blockingExecute() (execute/update), toList() / etc. (query), blockingBuild() (prepared statement).
  • Suspending: execute() (execute/update), build() (prepared statement); query consumption is the same (toList() or asFlow()).

Builders are convenient when you have many parameters or optional settings; behaviour is the same as the direct methods.

Execute vs update

  • execute: For statements that do not return a row count (DDL such as CREATE INDEX, stored procedures, etc.). Returns SqlExecutionOutcome.
  • update: For DML (INSERT, UPDATE, DELETE) that do return an affected row count. Returns Int.

Prepared statements vs single-shot execute/update

  • execute / update / executeBuilder / updateBuilder: One-shot; parameters are bound and the statement is run once.
  • createPreparedStatement / buildStatement / preparedStatement(...).build(): Create a reusable SqlDatabasePreparedStatement that you can call setParameters(...) and executeUpdate() / executeQuery() on multiple times. Use when you need repeated execution with different parameters or when you need generated keys (e.g. auto-increment IDs) via returnGeneratedKeys() / generatedFields(...).

Converting connection types

Useful when you have one kind of connection but need to call APIs that expect the other (e.g. passing a blocking connection into code that uses suspending functions by wrapping it once).


Detailed documentation

Connection metadata (shared)

connectionName: String

Read-only property identifying this database connection. Used for:

  • Identification — Distinguishing multiple connections (e.g. to different databases or with different roles).
  • Logging — Appearing in log messages so you can trace which connection ran a statement.
  • Metrics — Tagging or naming metrics (latency, throughput) per connection.

Same on both SqlDatabaseConnection and SqlAsyncDatabaseConnection; defined on SqlBaseDatabaseConnection.


Querying (SELECT) — shared API

Query methods are defined on SqlBaseDatabaseConnection, so they are identical for blocking and async connections. They return an ExecutableQuery<RESULT> that is executed only when you call a terminating method (see Consuming query results).

Direct query() overloads

All take a SQL string with optional named parameters (:paramName). The full variant is:

  • query(namedQuery, rowMapper, params, metricId) — Core method. Others delegate to this.

Convenience overloads (same behaviour, fewer arguments):

MethodParametersMetric
query(namedQuery, clazz)nonenull
query(namedQuery, rowMapper)nonenull
query(namedQuery, rowMapper, metricId)noneset
query(namedQuery, clazz, params)setnull
query(namedQuery, rowMapper, params)setnull
  • namedQuery: SQL SELECT with optional placeholders (e.g. :status, :id).
  • clazz: Class<RESULT> for automatic row mapping (Java-friendly). From Kotlin, the reified extension is usually simpler.
  • rowMapper: ResultSetMapper<RESULT> for custom mapping.
  • params: Map<String, Any?> or null. Keys match placeholder names (without :).
  • metricId: Optional string for observability; null means the SQL is typically used as the metric id.

Returns: ExecutableQuery<RESULT>.

Kotlin reified extension

connection.query<RESULT>(namedQuery, params, metricId)

Kotlin-only extension (reified RESULT) that uses automatic mapping via ResultSetMapperFactory.forClass(). Defaults: params = null, metricId = null. Use this when you want a single call and type inference, e.g. connection.query<User>("SELECT * FROM users WHERE id = :id", mapOf("id" to id)).

Query builder

queryBuilder(sql, rowMapper): QueryBuilder<RESULT> queryBuilder(sql, clazz): QueryBuilder<RESULT>

Creates a fluent builder for the same query. SQL and mapping (class or row mapper) are fixed at construction. Then:

  • param(key, value) — Set one parameter; can be chained.
  • params(params) — Set all parameters at once (replaces any previous).
  • metricId(metricId) — Set metric id.
  • build(): ExecutableQuery<RESULT> — Build the executable query (no execution yet).

Use when you prefer fluent configuration or need to pass a pre-built query around before consuming it.


Consuming query results (ExecutableQuery)

An ExecutableQuery<RESULT> is lazy: the database is hit only when you call a terminating operation. You can chain non-terminating operations first; they are applied when the query runs.

Terminating operations (execute the query; all are blocking except consumption of asFlow()):

MethodReturnsNotes
toList()List<RESULT>All rows in memory.
first()RESULTThrows if empty.
firstOrNull()RESULT?null if empty.
last()RESULTConsumes all rows.
collect(action)Calls action for each row.
fold(initial, fn)OUTPUTSingle value from all rows.
reduce(accumulator)RESULTSingle value; throws if empty.
toCollection(destination)same collectionFills the given collection.
count()IntNumber of rows.
sorted(comparator)List<RESULT>In-memory sort after load.
distinct()List<RESULT>In-memory distinct.
anyMatch(predicate)BooleanShort-circuits when found.
allMatch(predicate)BooleanShort-circuits when false.
noneMatch(predicate)BooleanShort-circuits when true.
max(comparator)RESULT?null if empty.
min(comparator)RESULT?null if empty.

All of the above are annotated @Blocking where applicable: they block the calling thread until the query completes. They work the same whether the connection is blocking or async; with an async connection, the actual I/O may run on another dispatcher, but the terminating call still blocks until the result is ready.

Streaming (non-blocking consumption in coroutines):

When using one of these operations, avoid making nested database calls from while streaming data. Collect, then query to avoid deadlocks.

  • asFlow(): Flow<RESULT> — Cold Flow; each collection runs the query. Use inside a coroutine (e.g. query.asFlow().collect { } or query.asFlow().toList()). Good for backpressure and processing rows as they arrive.
  • asFlowable(): Flowable<RESULT> — RxJava3; cold. Wraps asFlow().asFlowable().
  • asSingle(): Single<RESULT> — First element or error (RxJava3).
  • asMaybe(): Maybe<RESULT> — First element or empty (RxJava3).

Non-terminating operations (lazy; no execution):

  • take(n) — Limit to first n rows.
  • takeWhile(predicate) — Take while predicate is true.
  • map(transform) — Transform each row to another type.
  • filter(predicate) — Keep only rows matching predicate.
  • onEach(function) — Side effect per row (e.g. logging) before downstream.

Example: connection.query<User>(...).filter { it.active }.take(10).toList() runs the query once and then applies filter and take in memory. Use take(n) before a terminating call to limit how many rows are returned after mapping/filtering.

Property: sql: String — The original SQL of the query.


Execute (DDL / no row count)

Use for statements that do not return a result set or row count: DDL (CREATE INDEX, CREATE TABLE, etc.), stored procedures, and similar. Return type is SqlExecutionOutcome (success/failure, no row count).

Blocking (SqlDatabaseConnection)

  • execute(namedQuery) — No params, no metricId.
  • execute(namedQuery, metricId) — No params.
  • execute(namedQuery, params) — No metricId.
  • execute(namedQuery, params, metricId) — Full variant.

All block the calling thread until the statement completes.

Suspending (SqlAsyncDatabaseConnection)

Same four overloads as suspend fun execute(...). Call from a coroutine; they do not block the thread and typically run on a DB dispatcher.

Builder

executeBuilder(sql) — Returns a builder (shared by both connection types). Then:

  • param(key, value) / params(params) — Bind parameters.
  • metricId(metricId) — Set metric id.
  • execute(): SqlExecutionOutcome — Suspending; use from coroutines.
  • blockingExecute(): SqlExecutionOutcome — Blocking; use from Java or non-suspending code.

Update (DML / row count)

Use for INSERT, UPDATE, DELETE. Return type is Int (number of rows affected).

Blocking (SqlDatabaseConnection)

  • update(namedQuery) — No params, no metricId.
  • update(namedQuery, metricId) — No params.
  • update(namedQuery, params) — No metricId.
  • update(namedQuery, params, metricId) — Full variant.

All block the calling thread until the statement completes.

Suspending (SqlAsyncDatabaseConnection)

Same four overloads as suspend fun update(...). Call from a coroutine; they do not block the thread.

Builder

updateBuilder(sql) — Returns a builder (shared by both connection types). Then:

  • param(key, value) / params(params) — Bind parameters.
  • metricId(metricId) — Set metric id.
  • execute(): Int — Suspending.
  • blockingExecute(): Int — Blocking.

Prepared statements (reusable, generated keys)

Prepared statements are created once and can be executed multiple times with different parameters. They also support returning generated keys (e.g. auto-increment IDs) after INSERT.

Blocking: createPreparedStatement (SqlDatabaseConnection)

Overloads:

  • createPreparedStatement(namedQuery) — No generated keys, no metricId.
  • createPreparedStatement(namedQuery, returnGeneratedKeys) — Boolean for “return all generated keys”.
  • createPreparedStatement(namedQuery, metricId) — No generated keys.
  • createPreparedStatement(namedQuery, returnGeneratedKeys, metricId) — Full.
  • createPreparedStatement(namedQuery, generatedFields) — Return specific columns (e.g. listOf("id", "created_at")); some DBs require this.
  • createPreparedStatement(namedQuery, generatedFields, metricId) — With metricId.

All block during statement preparation. Return SqlDatabasePreparedStatement.

Using the statement:

  • setParameters(params: Map<String, Any?>?) — Bind parameters (or clear if null).
  • clearParameters() — Clear bound parameters.
  • execute(): SqlExecutionOutcome — For statements that don’t return a row count.
  • executeUpdate(): Int — For INSERT/UPDATE/DELETE; returns rows affected.
  • executeQuery(rowMapper): SqlResultSet<RESULT> — For SELECT; returns a result set (blocking). Overload with closeStatementOnClose controls whether the statement is closed when the result set is closed.
  • executeAsFlow(rowMapper): Flow<RESULT> — Same as above but as a Flow (for use in coroutines).
  • getGeneratedKeys(rowMapper): SqlResultSet<RESULT> — After an INSERT with generated keys, returns the generated key row(s) mapped with the given mapper. Consume via SqlResultSet (e.g. .first(), .toList()); it extends Sequence<RESULT>.

Resource lifecycle: The statement implements AutoCloseable. Use try-with-resources (Java) or statement.use { } (Kotlin) so it is closed when done.

Suspending: buildStatement (SqlAsyncDatabaseConnection)

Same overloads as above, but as suspend fun buildStatement(...). Creation is non-blocking; usage of the returned SqlDatabasePreparedStatement is the same (e.g. setParameters, executeUpdate(), getGeneratedKeys(rowMapper)). The statement’s execute/query methods are still blocking; only creation is suspending.

Builder: preparedStatement(sql) (shared)

preparedStatement(sql) — Returns PreparedStatementBuilder (same for both connection types). Then:

  • returnGeneratedKeys() — Return all generated keys.
  • generatedFields(fields) — Return specific column names.
  • metricId(metricId) — Set metric id.
  • build(): SqlDatabasePreparedStatement — Suspending; use from coroutines.
  • blockingBuild(): SqlDatabasePreparedStatement — Blocking; use from Java or non-suspending code.

After build() or blockingBuild(), use the statement as described under Blocking: createPreparedStatement (setParameters, executeUpdate, getGeneratedKeys, etc.).


Converting between connection types

asAsyncConnection(): SqlAsyncDatabaseConnection (on SqlDatabaseConnection)

Returns an async view of the same underlying connection. Use when:

  • You have a blocking connection (e.g. from a Java or legacy API) and need to call suspending database APIs.
  • You want to avoid blocking a thread by wrapping once and then using suspending execute/update/buildStatement.

The returned connection is not thread-safe; use it from a single coroutine/thread or synchronize externally. Closing one view does not necessarily close the other; refer to implementation/docs for close behaviour.

asSyncConnection(): SqlDatabaseConnection (on SqlAsyncDatabaseConnection)

Returns a blocking view of the same underlying connection. Use when:

  • You have an async connection but must call code that expects a blocking API (e.g. Java libraries).
  • You need to run database calls from a non-coroutine context and are okay blocking.

Same thread-safety and lifecycle notes as above; the blocking view’s methods will block the caller until the operation completes.


Deprecated methods (e.g. buildStatement on blocking connection) are not documented here.