- Is this your first time deploying Airbyte?: No
- OS Version / Instance: Ubuntu 18.04.4 LTS
- Memory / Disk: you can use something like 16Gb / 100GB
- Deployment: Docker
- Airbyte Version: 0.42.0
- Source name/version: Postgres
- Destination name/version: Snowflake
- Step: During Sync
- Description: Missing Deletions on CDC
Source Characteristics
- Postgres Connector version : 1.0.51
- Database Version : Aurora 12.04
- replication_mothod: CDC
- replication_plugin: pgoutput
Destination Characteristics
- Snowflake Destination version: 0.4.53
Connection Characteristics
- sync_mode: Incremental Deduped
- streams: 1 Table
-
Replication frequency:
- 1st set of attempts: every 6 hours
- 2nd set of attempts: every 3 hours
Source Table Characteristics
-
Change Volume between syncs:
- 6-hours-syncs → [1.5GB, 2.5GB]
- 3-hours-syncs → [500MB, 1.5GB]
- total volume: 5.5 GB
Problem Definition
we have detected that the table that we are syncing with the above-described setup and characteristics, misses some deletions.
This table accepts plenty of updates and deletions and inserts every hour, and that is the reason why the change volume on every sync can grow that much. One of our subsequent DBT transformations that use the deduped table failed in some uniqueness tests over a combined unique id.
After comparing both the deduped table in Snowflake and the source table in Postgres we identified that the prior one had some extra rows from the posterior. Furthermore, we double-checked it with the scd table, and there was no tracked deletion either on it.
This behavior increased on the latter syncs.
We want to understand if it is an expected behavior due to the heavy load of every sync or due to the replication slot failing to retrieve some of the changes.
Extra Hints
- We have faced this issue with the below Postgres Connectors versions
- v1.0.23
- v1.0.51
- We have tried it also with the new Postgres Connector version of 2.0.3 but CDC was not working so we rolled back to 1.0.51.
- After changing the sync frequency to 3 hours from 6, we starting facing the same problem much faster than before.
- The only Error that we are getting back without interrupting the sync process is
ERROR i.d.r.TableSchemaBuilder(lambda$createValueGenerator$5):302 Failed to properly convert data value for 'column_1' of type _varchar for row [masked_value, masked_value,masked_value, masked_value, java.lang.Object@21ebb339, masked_value, 0.0000, masked_value, masked_value, masked_value, null, null, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, java.lang.Object@21ebb339, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value]: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type ARRAY: class java.lang.String for field: "column_1"
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242) ~[connect-api-3.2.0.jar:?]
at org.apache.kafka.connect.data.Struct.put(Struct.java:216) ~[connect-api-3.2.0.jar:?]
at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:298) ~[debezium-core-1.9.6.Final.jar:1.9.6.Final]
at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141) ~[debezium-core-1.9.6.Final.jar:1.9.6.Final]
at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:102) ~[debezium-core-1.9.6.Final.jar:1.9.6.Final]
at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:53) ~[debezium-core-1.9.6.Final.jar:1.9.6.Final]
at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.emitChangeRecords(PostgresChangeRecordEmitter.java:96) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:209) ~[debezium-core-1.9.6.Final.jar:1.9.6.Final]
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.lambda$processMessages$0(PostgresStreamingChangeEventSource.java:275) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.decodeUpdate(PgOutputMessageDecoder.java:465) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:194) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
at io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:33) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:510) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:502) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:215) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:177) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:41) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:174) ~[debezium-core-1.9.6.Final.jar:1.9.6.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141) ~[debezium-core-1.9.6.Final.jar:1.9.6.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109) ~[debezium-core-1.9.6.Final.jar:1.9.6.Final]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
Stack Trace: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type ARRAY: class java.lang.String for field: "column_1"
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)
at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:298)
at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:102)
at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:53)
at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.emitChangeRecords(PostgresChangeRecordEmitter.java:96)
at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:209)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.lambda$processMessages$0(PostgresStreamingChangeEventSource.java:275)
at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.decodeUpdate(PgOutputMessageDecoder.java:465)
at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:194)
at io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:33)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:510)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:502)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:215)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:177)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:41)
at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:174)
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)