feat(connectors): add generic JDBC source and sink connectors (#2500)#3576
feat(connectors): add generic JDBC source and sink connectors (#2500)#3576shbhmrzd wants to merge 1 commit into
Conversation
…#2500) Adds JDBC source and sink connectors that let Iggy read from and write to any JDBC-compliant database (PostgreSQL, MySQL, Oracle, SQL Server, H2) via an embedded JVM, instead of a bespoke client per database. Source: polls a SQL query and produces each row as a JSON message; supports bulk and incremental (offset-tracked) modes with persisted offset state. Sink: consumes messages and INSERTs each into a target table, with optional auto-create-table and Iggy metadata columns. Both share one embedded-JVM/JNI bridge so a single JDBC driver JAR works for either direction. Includes per-connector docs and example configs, unit tests, Dockerized Postgres integration tests, and a small TCP listener fix so the connectors test harness can read the written runtime config.
|
Thanks for the PR. It is labeled Slash commands (own line, regular comment) move it around the queue:
See CONTRIBUTING.md for details. |
|
agreed @Standing-Man. question @shbhmrzd - how much time time does it take for your tests to finish, on your machine? only the ones that you added. |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #3576 +/- ##
============================================
- Coverage 74.07% 74.03% -0.04%
Complexity 937 937
============================================
Files 1249 1249
Lines 128248 128246 -2
Branches 104116 104159 +43
============================================
- Hits 94994 94952 -42
+ Misses 30219 30214 -5
- Partials 3035 3080 +45
🚀 New features to boost your workflow:
|
Hey @Standing-Man @hubcio For the tests, the first run on a fresh machine is a bit longer because it does a one-time pull of the Postgres image and a ~1MB JDBC driver download, but the subsequent runs finish in under a minute. |
Which issue does this PR address?
Closes #2500
Rationale
Iggy only shipped PostgreSQL-specific connectors, so every other database needed a purpose-built connector. A generic JDBC pair covers any JDBC-compliant database (PostgreSQL, MySQL, Oracle, SQL Server, H2, and others) through their existing JDBC drivers, instead of writing and maintaining a new connector per database.
What changed?
Before: reading from or writing to anything other than PostgreSQL required a bespoke connector. After: two new connectors talk to any JDBC database through an embedded JVM over JNI, reusing the standard
java.sqlAPI and the database's own driver JAR.The source polls a configured SQL query and produces each row as a JSON message, in bulk (full re-read) or incremental mode (tracks a
{last_offset}placeholder and persists the offset across restarts). The sink consumes messages and INSERTs each into a target table, with optional auto-create-table and Iggy metadata columns. Both share one embedded-JVM/JNI bridge, so a single driver JAR serves either direction.How to review
core/connectors/sources/jdbc_source/src/lib.rspoll()thenbuild_query(offset substitution viaquote_sql_literal) thenread_rows/extract_column_value(JDBC to JSON type mapping) thenbuild_message(offset state persistence)core/connectors/sinks/jdbc_sink/src/lib.rsopen()(JVM, connectivitySELECT 1, optional auto-create-table) thenconsume()thenwrite_messages/insert_batch(batchedPreparedStatement) thenbind_rowget_or_create_jvm, connection acquire/close in both cratespush_local_frame/pop_local_frame, pooled-connection return,isValidreconnect in the sourceextract_column_valueNUMERICandDECIMALmap to string (no f64 precision loss), binary maps to base64,BIGINTcaveat documentedis_transient_sql_stateand the classify helpers in both crates08,40,53,57,58are transient (retried), everything else is permanent. The sink skips and counts permanent batches, retries transient onesDebugimpls, the secret serde modules,sanitize_jdbc_urlSecretString, never logged or serializedcore/integration/tests/connectors/mod.rssetup_runtime, plussend_messages/get_messageshelpers used by the sink and source testscore/server/src/tcp/tcp_listener.rs(3 lines)PortReserver) and waits for the server to writecurrent_config.tomlto learn the bound address. The server previously wrote that file only when the port was dynamic (port == 0), so a fixed-port server hung. The fix always notifies the config writer once the listener binds. TheSO_REUSEPORTcross-shard broadcast stays gated onport == 0, so multi-shard behaviour is unchanged. Worth a check for cluster mode.Config reference and per-database examples live in each connector's
README.md,config.toml, and thecore/connectors/runtime/example_config/connectors/jdbc_*.tomlfiles.Local Execution
cargo fmt --all -- --check,cargo clippy --all-targets --all-features -- -D warnings,cargo check --all --all-features,cargo sort --check,cargo machete, license headers (hawkeye check), andmarkdownlint.iggy-serverplus separate source and sink connector runtimes against a Postgres container and pushed 150 messages through source then iggy then sink, confirming all 150 rows arrived intact in the destination table with contiguous offsets and exact decimal values.prekwas not run (not installed locally); the equivalent checks listed above (fmt, clippy, sort, machete, license headers, markdownlint, tests) were run manually instead.Limitations
AI Usage