JSON not being copied on incremental sync

Environment

  • Airbyte version: 0.40.6
  • OS Version / Instance: Amazon Linux 2 / AWS EC2
  • Deployment: Docker
  • Source Connector and version: Postgres 1.0.7
  • Destination Connector and version: Snowflake 0.4.35
  • Step where error happened: After initial sync

Current Behavior

We have a column which contains json data, but of type text in postgres. We’re using wal2json plugin for logical replication.

On the INITIAL sync, the “colA” which is json is captured and loaded into Snowflake without issue. However on subsequent incremental changes, this column is NULL. All other columns are captured without issue. Should we switch to using pgoutput, even though documentation recommends using wal2json when large JSON is involved?

-[ RECORD 1 ]--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id   | 210000000
colA | {"x":"","approvalInfo":{"y":"FULL","amount":"0","tAmount":"0","tBaseAmount":"0"},"response":"FAIL","duration":"177","funded":"true","RequestSent":"true","balances":{"x":"0","y":"0","z":"0","limit":"0","auth":"0","promo":"0"},"match":"NOT_PRESENT","duration":"123","lowrisk":"","securityLevel":"","ucf":"","collection":"NOT_PRESENT"}

Table definition:
                             Table "public.sometable"
         Column          |            Type             | Collation | Nullable | Default
-------------------------+-----------------------------+-----------+----------+---------
 id                      | bigint                      |           | not null |
 colA                    | text                        |           |          |

Steps to Reproduce

  1. Do an initial sync
  2. Have some data go into table
  3. Perform CDC and watch as column is null

Hello there! You are receiving this message because none of your fellow community members has stepped in to respond to your topic post. (If you are a community member and you are reading this response, feel free to jump in if you have the answer!) As a result, the Community Assistance Team has been made aware of this topic and will be investigating and responding as quickly as possible.
Some important considerations that will help your to get your issue solved faster:

  • It is best to use our topic creation template; if you haven’t yet, we recommend posting a followup with the requested information. With that information the team will be able to more quickly search for similar issues with connectors and the platform and troubleshoot more quickly your specific question or problem.
  • Make sure to upload the complete log file; a common investigation roadblock is that sometimes the error for the issue happens well before the problem is surfaced to the user, and so having the tail of the log is less useful than having the whole log to scan through.
  • Be as descriptive and specific as possible; when investigating it is extremely valuable to know what steps were taken to encounter the issue, what version of connector / platform / Java / Python / docker / k8s was used, etc. The more context supplied, the quicker the investigation can start on your topic and the faster we can drive towards an answer.
  • We in the Community Assistance Team are glad you’ve made yourself part of our community, and we’ll do our best to answer your questions and resolve the problems as quickly as possible. Expect to hear from a specific team member as soon as possible.

Thank you for your time and attention.
Best,
The Community Assistance Team

@rocky, this sounds less of an output plugin issue and perhaps a replica identity issue.

When you say changes above, are the changes to colA or to another column in the table? Does the table have a primary key or explicit replica identity configured?

The table is using a default replica key, which is it’s primary key. We have other tables being synced the same way without issue, but this is a larger table.

@adam : I switched to pgoutput plugin, problem still exists.

“sometable” has a primary key and a default replica set.

Let me know what additional information you need from us at this time.

2022-10-03 15:09:00 source > REPLICA IDENTITY for 'public.sometable' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK colum
ns
2022-10-03 15:09:00 source > Searching for WAL resume position
2022-10-03 15:09:00 source > First LSN 'LSN{17E/92DBD370}' received
2022-10-03 15:09:00 source > LSN after last stored change LSN 'LSN{17E/92DBE0D0}' received
2022-10-03 15:09:00 source > WAL resume position 'LSN{17E/92DBE0D0}' discovered
2022-10-03 15:09:00 source > Connection gracefully closed
2022-10-03 15:09:00 source > Connection gracefully closed
2022-10-03 15:09:00 source > Initializing PgOutput logical decoder publication
2022-10-03 15:09:00 source > Requested thread factory for connector PostgresConnector, id = journal_processor named = keep-alive
2022-10-03 15:09:00 source > Creating thread debezium-postgresconnector-journal_processor-keep-alive
2022-10-03 15:09:00 source > Processing messages
2022-10-03 15:09:00 source > Streaming requested from LSN LSN{17E/92DBDC70}, received LSN LSN{17E/92DBD370} identified as already processed
2022-10-03 15:09:00 source > Streaming requested from LSN LSN{17E/92DBDC70}, received LSN LSN{0/0} identified as already processed
2022-10-03 15:09:00 source > Streaming requested from LSN LSN{17E/92DBDC70}, received LSN LSN{17E/92DBD370} identified as already processed
2022-10-03 15:09:00 source > Streaming requested from LSN LSN{17E/92DBDC70}, received LSN LSN{17E/92DBDC70} identified as already processed```

Note - debugging this further. The initial insert is captured by airbyte, but the subsequent update, which modifies the value in colA is not.

Hey, thanks for providing so much info. It’s interesting the behaviour is different on initial sync because in effect this initialization is a full refresh sync. After that initialization is when data comes from the postgres WAL.

Could you check that all tables you want synced are added to the airbyte publication? This should display all tables added to all configured publications: select * from pg_publication_tables;

Yes, all tables have the right airbyte publication.

Note: The table does have multiple publications to it, but this connection is using its own (and a unique replication slot as well).

Few other things of interest:

  • one of the column is defined as text in postgres, but is storing a json column
  • inserts after the initial sync are being captured in destination correctly, but UPDATES to the row are being missed. Usually the update contains a large json which is being stored as text in the table.
  • the insert and update happen very close in time.
  • I peeked the logical replication slot; I’m able to see both the insert and update.

@sh4sh : any update here? We’re using this system in a production environment and this is causing issues for an important table.

Hello, thanks for your replies. Would you be able to try a newer connector version? There have been some recent changes with the postgres connector, not sure if you’re still on 1.0.7 but the latest version is 1.0.13: https://docs.airbyte.com/integrations/sources/postgres#changelog

Upgrading now, will run a test and let you know

Upgrading to the latest postgres connector didn’t fix the issue. Please let me know next steps @sh4sh!

Thanks for giving the latest connector a try.

To confirm, the incremental syncs are marked as successful in airbyte despite the data not copying from ColA?

Could you share the full logs from when this issue occurred, especially the incremental sync, and possibly the first full sync as well?
I did find this very similar issue but unfortunately the person who reported it closed the issue without posting their resolution: https://github.com/airbytehq/airbyte/issues/12844

Yes confirmed, initial sync and incremental sync are successful. Incremental sync contain the rows for INSERTS, but not UPDATES to the record.

Logs sent privately over Slack

I wonder if this is related to the JDBC fix outlined in Version 3.13.21, i.e.: “Fixed an issue with missing data in the JDBC chunk downloader.”

Notes here → Snowflake Community

Hey, just to provide an update, I’ve been working on reproducing this. So far I have set up a local Postgres 14 with wal2json, adding some dummy data and it seemed to sync OK including updates and deletes. Maybe my json is too small? I’ll keep trying, and I’ve raised the issue to my team in case they have any input.

If you are using normalization, did you check the airbyte raw tables to see if they have the column? Also, if you could test your data with a different destination (maybe Local JSON) that would be very helpful to narrow down the issue scope.

Hey Rocky, I see here that the JDBC fix was added into the Snowflake connector 0.4.37 release, could you give that one a try if you haven’t yet? https://github.com/airbytehq/airbyte/pull/16839

I’ve created a Github issue to track this: https://github.com/airbytehq/airbyte/issues/17955

I have also raised a ticket with our on call engineers, I’ll let you know when there are updates.

We have upgraded to Airbyte 0.40.13, using postgres source connector 1.0.14 and snowflake destination connector 0.4.38. However we’re still experiencing the problem.

Note: we have upgraded to Airbyte 0.40.13, using postgres source connector 1.0.14 and snowflake destination connector 0.4.38.

[Discourse post]

Sunny, thanks for the recommendation on checking the raw table on Snowflake side. I was focusing primarily on the postgres side, which doesn’t appear to be the issue.

First row is the insert, second row is the update.

>select _airbyte_emitted_at, _airbyte_data:transactionid from _AIRBYTE_RAW_TRANSACTIONLOG where _airbyte_data:id = 220945937 order by _airbyte_emitted_at;
+-------------------------------+-----------------------------+
| _AIRBYTE_EMITTED_AT           | _AIRBYTE_DATA:TRANSACTIONID |
|-------------------------------+-----------------------------|
| 2022-10-13 19:14:13.075 +0000 | null                        |
| 2022-10-13 19:14:13.075 +0000 | 116771931                   |
+-------------------------------+-----------------------------+

Notice the update isn’t reflected:

>select _airbyte_emitted_at, transactionid from transactionlog where id = 220945937;
+-------------------------------+---------------+
| _AIRBYTE_EMITTED_AT           | TRANSACTIONID |
|-------------------------------+---------------|
| 2022-10-13 19:14:13.075 +0000 |          NULL |
+-------------------------------+---------------+