Incremental sync failing for Postgres source connector on M1 Mac Docker deployment

  • Is this your first time deploying Airbyte?: Yes
  • OS Version / Instance: MacOS M1
  • Memory / Disk: 10Gb / 1 Tb
  • Deployment: Docker
  • Airbyte Version: latest
  • Source name/version: Postgres - latest
  • Destination name/version: Postgres - latest
  • Step: Sync
  • Description:

I am evaluating using airbyte for a new DW. For a POC i built a postgres to postgres connection for 5 tables local, docker deployment, basic normalization and incremental with dedup history replication. The initial load works great, but every subsequent sync fails where it trying to make a second jdbc connection back to the source db. Seems like a race condition, maybe due to both db instances on localhost w/ different ports ? It is not an intermittent problem. This is all on a M1 mac so i had to build everything from latest to run native arm. On the source postgres db I set the max_wal_workers to 1 because there is only one replication slot, increasing this value just causes a different connection error saying the replication slot is already in use.
Here is the error from log:


2022-04-26 13:44:42 [43mdestination[0m > 2022-04-26 13:44:42 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):126 - Preparing tmp table in destination started for stream django_celery_beat_crontabschedule. schema: public, tmp table name: _airbyte_tmp_khf_django_celery_beat_crontabschedule
2022-04-26 13:44:42 [44msource[0m > 2022-04-26 13:44:42 [32mINFO[m i.d.c.p.PostgresSchema(printReplicaIdentityInfo):116 - REPLICA IDENTITY for 'public.django_celery_beat_periodictask' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns
2022-04-26 13:44:42 [44msource[0m > 2022-04-26 13:44:42 [32mINFO[m i.d.c.p.PostgresSchema(printReplicaIdentityInfo):116 - REPLICA IDENTITY for 'public.django_celery_beat_crontabschedule' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns
2022-04-26 13:44:42 [44msource[0m > 2022-04-26 13:44:42 [32mINFO[m i.d.c.p.PostgresSchema(printReplicaIdentityInfo):116 - REPLICA IDENTITY for 'public.django_celery_beat_intervalschedule' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns
2022-04-26 13:44:42 [44msource[0m > 2022-04-26 13:44:42 [32mINFO[m i.d.c.p.PostgresSchema(printReplicaIdentityInfo):116 - REPLICA IDENTITY for 'public.django_celery_beat_clockedschedule' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns
2022-04-26 13:44:42 [44msource[0m > 2022-04-26 13:44:42 [32mINFO[m i.d.c.p.PostgresStreamingChangeEventSource(searchWalPosition):268 - Searching for WAL resume position
2022-04-26 13:44:42 [43mdestination[0m > 2022-04-26 13:44:42 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):126 - Preparing tmp table in destination started for stream django_celery_beat_intervalschedule. schema: public, tmp table name: _airbyte_tmp_pdu_django_celery_beat_intervalschedule
2022-04-26 13:44:42 [43mdestination[0m > 2022-04-26 13:44:42 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):126 - Preparing tmp table in destination started for stream django_celery_beat_periodictask. schema: public, tmp table name: _airbyte_tmp_gth_django_celery_beat_periodictask
2022-04-26 13:44:42 [43mdestination[0m > 2022-04-26 13:44:42 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):132 - Preparing tables in destination completed.
2022-04-26 13:44:43 [44msource[0m > 2022-04-26 13:44:42 [32mINFO[m i.d.c.p.c.WalPositionLocator(resumeFromLsn):60 - First LSN 'LSN{0/241F10B8}' received
2022-04-26 13:44:43 [44msource[0m > 2022-04-26 13:44:42 [32mINFO[m i.d.c.p.PostgresStreamingChangeEventSource(searchWalPosition):287 - WAL resume position 'LSN{0/241F10B8}' discovered
2022-04-26 13:44:43 [44msource[0m > 2022-04-26 13:44:43 [32mINFO[m i.d.j.JdbcConnection(lambda$doClose$3):945 - Connection gracefully closed
2022-04-26 13:44:43 [44msource[0m > 2022-04-26 13:44:43 [1;31mERROR[m i.d.p.ErrorHandler(setProducerThrowable):31 - Producer failure
2022-04-26 13:44:43 [44msource[0m > org.postgresql.util.PSQLException: FATAL: number of requested standby connections exceeds max_wal_senders (currently 1)
2022-04-26 13:44:43 [44msource[0m > 	at org.postgresql.core.v3.ConnectionFactoryImpl.doAuthentication(ConnectionFactoryImpl.java:613) ~[postgresql-42.2.18.jar:42.2.18]
2022-04-26 13:44:43 [44msource[0m > 	at org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:161) ~[postgresql-42.2.18.jar:42.2.18]
2022-04-26 13:44:43 [44msource[0m > 	at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:213) ~[postgresql-42.2.18.jar:42.2.18]
2022-04-26 13:44:43 [44msource[0m > 	at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:51) ~[postgresql-42.2.18.jar:42.2.18]
2022-04-26 13:44:43 [44msource[0m > 	at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:225) ~[postgresql-42.2.18.jar:42.2.18]
2022-04-26 13:44:43 [44msource[0m > 	at org.postgresql.Driver.makeConnection(Driver.java:465) ~[postgresql-42.2.18.jar:42.2.18]
2022-04-26 13:44:43 [44msource[0m > 	at org.postgresql.Driver.connect(Driver.java:264) ~[postgresql-42.2.18.jar:42.2.18]
2022-04-26 13:44:43 [44msource[0m > 	at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:230) ~[debezium-core-1.4.2.Final.jar:1.4.2.Final]
2022-04-26 13:44:43 [44msource[0m > 	at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:871) ~[debezium-core-1.4.2.Final.jar:1.4.2.Final]
2022-04-26 13:44:43 [44msource[0m > 	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.reconnect(PostgresReplicationConnection.java:613) ~[debezium-connector-postgres-1.4.2.Final.jar:1.4.2.Final]
2022-04-26 13:44:43 [44msource[0m > 	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:142) ~[debezium-connector-postgres-1.4.2.Final.jar:1.4.2.Final]
2022-04-26 13:44:43 [44msource[0m > 	at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:140) ~[debezium-core-1.4.2.Final.jar:1.4.2.Final]
2022-04-26 13:44:43 [44msource[0m > 	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:113) ~[debezium-core-1.4.2.Final.jar:1.4.2.Final]
2022-04-26 13:44:43 [44msource[0m > 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
2022-04-26 13:44:43 [44msource[0m > 	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
2022-04-26 13:44:43 [44msource[0m > 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
2022-04-26 13:44:43 [44msource[0m > 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
2022-04-26 13:44:43 [44msource[0m > 	at java.lang.Thread.run(Thread.java:833) [?:?] (edited)

Hey @robngray,
I think this might be related to your local Postgres config rather than Airbyte itself. Did you try to tweak other CDC-related parameters?
We have a bit of resources about CDC and Airbyte if you did find them already:

Let me know if the resource helped, if not I’ll reach our connector team.

yeah I did see that documentation:

  1. copy postgres.conf from postgres db docker volume
    edit settings:
    wal_level = logical
    max_wal_senders = 1
    max_replication_slots = 1
  2. In docker.compose yaml mount edited file to container, and set postgres to use it.
    command: postgres -c config_file=/etc/postgresql.conf
    volumes:
    - ./replication_postgresql.conf:/etc/postgresql.conf
  3. run these in db:
       SELECT pg_create_logical_replication_slot('airbyte_slot', 'pgoutput');
       CREATE PUBLICATION airbyte_publication FOR TABLE django_celery_beat_periodictask;
       ALTER TABLE django_celery_beat_periodictask REPLICA IDENTITY DEFAULT;

In airbyte source connector I configured the replication:

Thanks for taking a look at this!

Thanks, I’ll reach out to our connector team to get additional support on this.

Hello,
Looking at the source for the postgres connector, I may have found the potential failure point.
From the logs, after the wal position is found, the jdbc connection is closed, then immediately a new connection is attempted and that fails, do to a process already connected.
This occurs on line 141 here in the io.debezium.connector.postgresql.connection.PostgresStreamingChangeEventSource class from the debezium 1.4.1 jar:

The stream.stopKeepAlive() method causes the connection to close by stopping the thread:

image

However calling this shutdownNow() doesn’t wait for any tasks to be completed before returning so a race can occur when the stream attempts to reconnect.

From Oracle documentation on the ExecutorService,

https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ExecutorService.html

They recommend using shutdown() first and waiting for any tasks to be finished before trying shutdownNow():

Could this be the issue? Like I said, the first full refresh works fine, it’s any subsequent sync’s that fail on this connection consistently. I’ve pretty much got everything else working pg to pg, with CDC and custom DBT transform, just would like to demonstrate the incremental syncs.

Hey @robngray,
Thank you very much for digging in! I think it deserves to open an issue on our repo, explaining your problem and your discoveries.
I’ll make sure to escalate it to our DB connectors team to get their feedback on this topic. Improving our DB connectors is one of our quarter’s priorities.
Please link the issue in this thread.

OK I opened an issue to repo here : CDC incremental sync failing with Postgres source connector on M1 Docker local installation · Issue #12771 · airbytehq/airbyte · GitHub

Thank you! Our team will keep you updated on Github. Feel free to try to patch the Postgres connector with the fix you considered and let us know if it overcomes the issue.