Postgres CDC not working - no deleted at and incremental append seems to add all rows and not updates

  • Is this your first time deploying Airbyte?: Yes
  • OS Version / Instance: Ubuntu
  • Memory / Disk: 32GB
  • Deployment: OSS docker compose on EC2
  • Airbyte Version: 0.39.5
  • Source name/version: Postgres 0.4.28
  • Destination name/version: Postgres 0.3.20
  • Step: Sync
  • Description:

I have set up a test CDC connection between one postgres database and another to assess the CDC capabilities. I have created a test table in the source db and want to check replication to the destination db via logical replication for the incremental append mode.

The output looks incorrect and isn’t replicating deletes as I expect.
My expectation is that only updated rows should be added to the destination table, and deleted rows should have a deleted at value. This is based on the behaviour seen in the demo video: Change Data Capture for Postgres - YouTube

Instead I am seeing:

  • Appended rows are simply the ones which already existed in the source table but were unchanged between syncs.
  • There is no deleted at value for deleted rows, instead they’re simply not being included in the appended rows on syncing.
    It’s possible this is a bug, as I have seen several posts and git issues mentioning issues with deletes, however the overall behaviour (appending of unchanged rows) doesn’t even match what I’d expect to see for incremental append, so it’s possible I’ve just done something wrong.
    Removing normalisation and examining the raw json output has the same result.

This is the same for both pgoutput and wal2json replication slot plugins. I’ve tried recreating and resetting the sources, connectors and source tables and seen the same behaviours.

Source db.schema.table: airbyte_cdc_test_source_history.history.hist_table1
Destination db.schema.table: airbyte_cdc_test_destination.history.hist_table1

Source table:

create table history.hist_table1 (id INT primary key, column1 VARCHAR);
ALTER TABLE history.hist_table1 REPLICA IDENTITY default;
INSERT into history.hist_table1 values (1, 'val1');
INSERT into history.hist_table1 values (2, 'val2');
INSERT into history.hist_table1 values (3, 'val3');
INSERT into history.hist_table1 values (4, 'val4');

-- run sync

delete from history.hist_table1 where id = 4;
delete from history.hist_table1 where column1 = 'val3';

I expect to see in the source the rows for 1 and 2 once, then two copies of rows 3 and 4; one for the original values, then another set of rows with a deleted at. Instead I see the attached.

Is this a bug or am I doing something wrong?

Possible leads:

  • Initially the source table did not have a primary key
  • I had initially set up the connection using full refresh mode (incremental was unavailable because there was no primary key)

I have since changed both (recreated the source table with a primary key, and changed the replication mode to incremental append). I also deleted and recreated all the connections and tables, and reset the sync many times to check the update to the method had happened properly, but without success. It’s possible that the changes I made somewhere along the way are not getting made under the hood, and so airbyte is still using full refresh mode somehow even though the replication config is changed.

Some logs for your reading pleasure.
logs-1616.txt (63.2 KB)
logs-1615.txt (63.4 KB)

From the logs:
2022-06-30 15:25:33 e[44msourcee[0m > 2022-06-30 15:25:33 e[32mINFOe[m i.d.c.p.c.PostgresConnection(readReplicationSlotInfo):251 - Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{B4D/60EEA8C0}, catalogXmin=239486]
2022-06-30 15:25:33 e[44msourcee[0m > 2022-06-30 15:25:33 e[32mINFOe[m i.d.c.p.PostgresConnectorTask(start):117 - No previous offset found
2022-06-30 15:25:33 e[44msourcee[0m > 2022-06-30 15:25:33 e[32mINFOe[m i.d.c.p.s.InitialSnapshotter(shouldSnapshot):34 - Taking initial snapshot for new datasource

So it looks like it isn’t recognising that there’s an offset from the last run and trying to take a full snapshot at each sync. Would explain why it grabs all rows instead of changed rows.

Any suggestions on why this might be happening or how to fix it?

Hi @kyle-mackenzie-indee,
Thank you for the torough investigation and clear explanation.
It indeed looks like debezium is running a snapshot on every sync. I think this is due to the following error:

errors: $ is not defined in the schema and the schema does not allow additional properties, $ is not defined in the schema and the schema does not allow additional properties

Airbyte stores debezium offset in its state object, and there was an error at this state checkpointing step.
I think this error could come from an incompatibility between the Postgres source connector and your Airbyte version. The shared_state property was recently introduced, the source-postgres 0.4.28 generates a state with this new property but the state validation, happening at the platform levels, fails because Airbyte 0.39.5 does not support it. Could you please try to upgrade Airbyte to the latest version and try again?

I’m going to reach out to our platform team to make sure my intuition is correct.

1 Like

I think your problem is related to this issue. If you are not able to upgrade Airbyte, please downgrade the source postgres connector to 0.4.26.

1 Like

Working great now! Many thanks!

Using versions:
Airbyte v0.39.32-alpha
Postgres Source 0.4.30
Postgres Destination 0.3.20

I had some problems initially with airbyte crashing after upgrading, but I switched off all running connectors and it became stable. I’m guessing since those connectors were created on the older airbyte version there may be some mismatches there too. I’ll recreate them under the new version but that would be a separate issue anyway.
(Edit - recreating the connectors under the new version worked fine)

1 Like

Hi there from the Community Assistance team.
We’re letting you know about an issue we discovered with the back-end process we use to handle topics and responses on the forum. If you experienced a situation where you posted the last message in a topic that did not receive any further replies, please open a new topic to continue the discussion. In addition, if you’re having a problem and find a closed topic on the subject, go ahead and open a new topic on it and we’ll follow up with you. We apologize for the inconvenience, and appreciate your willingness to work with us to provide a supportive community.