Missing Deletions on Postgres CDC

  • Is this your first time deploying Airbyte?: No
  • OS Version / Instance: Ubuntu 18.04.4 LTS
  • Memory / Disk: you can use something like 16Gb / 100GB
  • Deployment: Docker
  • Airbyte Version: 0.42.0
  • Source name/version: Postgres
  • Destination name/version: Snowflake
  • Step: During Sync
  • Description: Missing Deletions on CDC

Source Characteristics

  • Postgres Connector version : 1.0.51
  • Database Version : Aurora 12.04
  • replication_mothod: CDC
  • replication_plugin: pgoutput

Destination Characteristics

  • Snowflake Destination version: 0.4.53

Connection Characteristics

  • sync_mode: Incremental Deduped
  • streams: 1 Table
  • Replication frequency:
    • 1st set of attempts: every 6 hours
    • 2nd set of attempts: every 3 hours

Source Table Characteristics

  • Change Volume between syncs:
    • 6-hours-syncs → [1.5GB, 2.5GB]
    • 3-hours-syncs → [500MB, 1.5GB]
  • total volume: 5.5 GB

Problem Definition
we have detected that the table that we are syncing with the above-described setup and characteristics, misses some deletions.

This table accepts plenty of updates and deletions and inserts every hour, and that is the reason why the change volume on every sync can grow that much. One of our subsequent DBT transformations that use the deduped table failed in some uniqueness tests over a combined unique id.

After comparing both the deduped table in Snowflake and the source table in Postgres we identified that the prior one had some extra rows from the posterior. Furthermore, we double-checked it with the scd table, and there was no tracked deletion either on it.
This behavior increased on the latter syncs.

We want to understand if it is an expected behavior due to the heavy load of every sync or due to the replication slot failing to retrieve some of the changes.

Extra Hints

  • We have faced this issue with the below Postgres Connectors versions
    • v1.0.23
    • v1.0.51
  • We have tried it also with the new Postgres Connector version of 2.0.3 but CDC was not working so we rolled back to 1.0.51.
  • After changing the sync frequency to 3 hours from 6, we starting facing the same problem much faster than before.
  • The only Error that we are getting back without interrupting the sync process is
ERROR i.d.r.TableSchemaBuilder(lambda$createValueGenerator$5):302 Failed to properly convert data value for 'column_1' of type _varchar for row [masked_value, masked_value,masked_value, masked_value, java.lang.Object@21ebb339, masked_value, 0.0000, masked_value, masked_value, masked_value, null, null, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, java.lang.Object@21ebb339, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value, masked_value]: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type ARRAY: class java.lang.String for field: "column_1"
	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242) ~[connect-api-3.2.0.jar:?]
	at org.apache.kafka.connect.data.Struct.put(Struct.java:216) ~[connect-api-3.2.0.jar:?]
	at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:298) ~[debezium-core-1.9.6.Final.jar:1.9.6.Final]
	at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141) ~[debezium-core-1.9.6.Final.jar:1.9.6.Final]
	at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:102) ~[debezium-core-1.9.6.Final.jar:1.9.6.Final]
	at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:53) ~[debezium-core-1.9.6.Final.jar:1.9.6.Final]
	at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.emitChangeRecords(PostgresChangeRecordEmitter.java:96) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:209) ~[debezium-core-1.9.6.Final.jar:1.9.6.Final]
	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.lambda$processMessages$0(PostgresStreamingChangeEventSource.java:275) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
	at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.decodeUpdate(PgOutputMessageDecoder.java:465) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
	at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:194) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
	at io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:33) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:510) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:502) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:215) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:177) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:41) ~[debezium-connector-postgres-1.9.6.Final.jar:1.9.6.Final]
	at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:174) ~[debezium-core-1.9.6.Final.jar:1.9.6.Final]
	at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141) ~[debezium-core-1.9.6.Final.jar:1.9.6.Final]
	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109) ~[debezium-core-1.9.6.Final.jar:1.9.6.Final]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]

Stack Trace: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type ARRAY: class java.lang.String for field: "column_1"
	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)
	at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
	at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:298)
	at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
	at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:102)
	at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:53)
	at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.emitChangeRecords(PostgresChangeRecordEmitter.java:96)
	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:209)
	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.lambda$processMessages$0(PostgresStreamingChangeEventSource.java:275)
	at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.decodeUpdate(PgOutputMessageDecoder.java:465)
	at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:194)
	at io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:33)
	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:510)
	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:502)
	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:215)
	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:177)
	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:41)
	at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:174)
	at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141)
	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Hello there! You are receiving this message because none of your fellow community members has stepped in to respond to your topic post. (If you are a community member and you are reading this response, feel free to jump in if you have the answer!) As a result, the Community Assistance Team has been made aware of this topic and will be investigating and responding as quickly as possible.
Some important considerations that will help your to get your issue solved faster:

  • It is best to use our topic creation template; if you haven’t yet, we recommend posting a followup with the requested information. With that information the team will be able to more quickly search for similar issues with connectors and the platform and troubleshoot more quickly your specific question or problem.
  • Make sure to upload the complete log file; a common investigation roadblock is that sometimes the error for the issue happens well before the problem is surfaced to the user, and so having the tail of the log is less useful than having the whole log to scan through.
  • Be as descriptive and specific as possible; when investigating it is extremely valuable to know what steps were taken to encounter the issue, what version of connector / platform / Java / Python / docker / k8s was used, etc. The more context supplied, the quicker the investigation can start on your topic and the faster we can drive towards an answer.
  • We in the Community Assistance Team are glad you’ve made yourself part of our community, and we’ll do our best to answer your questions and resolve the problems as quickly as possible. Expect to hear from a specific team member as soon as possible.

Thank you for your time and attention.
Best,
The Community Assistance Team

Is there anyone that could help us?
We have faced the same issue with another connection.