Custom Source Connection Crashing After Switching to Airbyte v1

Summary

Custom source connections linked to several connections crash with ‘Checking source connection failed’ message after switching to Airbyte v1. Inputs are no longer displayed when reconfiguring the source. Need help troubleshooting and finding a solution.


Question

Hello,
I’ve developed a custom source which is linked to several connections and since I switched to Airbyte v1, regularly and randomly, every day, some connections (but not all) crash with a “Checking source connection failed” message. I then go to the source linked to the connection, supply the necessary inputs again (which are no longer displayed, by the way: why?..), save and test the source. Everything’s ok and I can restart the synchronization manually.
Are any of you experiencing this problem? Do you have any solutions?
Thanks in advance and have a nice day,



This topic has been created from a Slack thread to give it more visibility.
It will be on Read-Only mode here. Click here if you want
to access the original thread.

Join the conversation on Slack

["custom-source", "connection-crash", "airbyte-v1", "checking-source-connection-failed"]

Have you created your custom source connector with no code Connector Builder, low code, Python CDK?
Do you have more logs than this message?

Regarding inputs, if they are marked as airbyte_secret with value true, they won’t be displayed.

It’s Python CDK connector.
Here is more logs about Checking source failed.

2024-10-22 02:57:24 INFO i.a.w.l.p.s.m.Stage(apply):39 - APPLY Stage: CLAIM — (workloadId = fe66c1ea-5fdc-4bc3-8cab-8f39e6291030_16903_1_check) — (dataplaneId = local)
2024-10-22 02:57:37 INFO i.a.c.ConnectorWatcher(run):87 - Connector exited, processing output
2024-10-22 02:57:37 INFO i.a.c.i.LineGobbler(voidCall):166 - 
2024-10-22 02:57:37 INFO i.a.c.i.LineGobbler(voidCall):166 - ----- START CHECK -----
2024-10-22 02:57:37 INFO i.a.c.i.LineGobbler(voidCall):166 - 
2024-10-22 02:57:37 INFO i.a.c.ConnectorWatcher(run):90 - Output file jobOutput.json found
2024-10-22 02:57:37 INFO i.a.c.ConnectorWatcher(run):96 - Connector exited with 0
2024-10-22 02:57:37 INFO i.a.w.i.VersionedAirbyteStreamFactory(create):189 - Reading messages from protocol version 0.2.0
2024-10-22 02:57:37 WARN i.a.m.l.MetricClientFactory(getMetricClient):43 - MetricClient has not been initialized. Must call MetricClientFactory.CreateMetricClient before using MetricClient. Using a dummy client for now. Ignore this if Airbyte is configured to not publish any metrics.
2024-10-22 02:57:38 ERROR i.a.w.i.VersionedAirbyteStreamFactory(internalLog):304 - Check failed
2024-10-22 02:57:38 INFO i.a.c.ConnectorMessageProcessor(updateConfigFromControlMessage):231 - Checking for optional control message...
2024-10-22 02:57:38 INFO i.a.c.ConnectorWatcher(run):134 - Writing output of fe66c1ea-5fdc-4bc3-8cab-8f39e6291030_16903_1_check to the doc store
2024-10-22 02:57:38 INFO i.a.c.ConnectorWatcher(run):136 - Marking workload as successful
2024-10-22 02:57:38 INFO i.a.c.ConnectorWatcher(exitProperly):189 - Deliberately exiting process with code 0.
2024-10-22 02:57:38 INFO i.a.c.i.LineGobbler(voidCall):166 - 
2024-10-22 02:57:38 INFO i.a.c.i.LineGobbler(voidCall):166 - ----- END CHECK -----
2024-10-22 02:57:38 INFO i.a.c.i.LineGobbler(voidCall):166 - 
2024-10-22 02:57:06 platform > Backing off for: 10 seconds.
2024-10-22 02:57:41 platform > Retry State: RetryManager(completeFailureBackoffPolicy=BackoffPolicy(minInterval=PT10S, maxInterval=PT30M, base=3), partialFailureBackoffPolicy=null, successiveCompleteFailureLimit=5, totalCompleteFailureLimit=10, successivePartialFailureLimit=1000, totalPartialFailureLimit=20, successiveCompleteFailures=2, totalCompleteFailures=2, successivePartialFailures=0, totalPartialFailures=0)
 Backoff before next attempt: 30 seconds
2024-10-22 02:57:41 platform > Failing job: 16903, reason: Connection Check Failed 43a78959-46f1-4510-8440-db340b4cedaf
2024-10-22 02:57:24 INFO i.a.w.l.c.WorkloadApiClient(claim):75 - Claimed: true for fe66c1ea-5fdc-4bc3-8cab-8f39e6291030_16903_1_check via API for local
2024-10-22 02:57:24 INFO i.a.w.l.p.s.m.Stage(apply):39 - APPLY Stage: CHECK_STATUS — (workloadId = fe66c1ea-5fdc-4bc3-8cab-8f39e6291030_16903_1_check) — (dataplaneId = local)
2024-10-22 02:57:24 INFO i.a.w.l.p.s.CheckStatusStage(applyStage):59 - No pod found running for workload fe66c1ea-5fdc-4bc3-8cab-8f39e6291030_16903_1_check
2024-10-22 02:57:24 INFO i.a.w.l.p.s.m.Stage(apply):39 - APPLY Stage: BUILD — (workloadId = fe66c1ea-5fdc-4bc3-8cab-8f39e6291030_16903_1_check) — (dataplaneId = local)
2024-10-22 02:57:24 INFO i.a.w.l.p.s.m.Stage(apply):39 - APPLY Stage: MUTEX — (workloadId = fe66c1ea-5fdc-4bc3-8cab-8f39e6291030_16903_1_check) — (dataplaneId = local)
2024-10-22 02:57:24 INFO i.a.w.l.p.s.EnforceMutexStage(applyStage):50 - No mutex key specified for workload: fe66c1ea-5fdc-4bc3-8cab-8f39e6291030_16903_1_check. Continuing...
2024-10-22 02:57:24 INFO i.a.w.l.p.s.m.Stage(apply):39 - APPLY Stage: LAUNCH — (workloadId = fe66c1ea-5fdc-4bc3-8cab-8f39e6291030_16903_1_check) — (dataplaneId = local)
2024-10-22 02:57:32 INFO i.a.w.l.c.WorkloadApiClient(updateStatusToLaunched):60 - Attempting to update workload: fe66c1ea-5fdc-4bc3-8cab-8f39e6291030_16903_1_check to LAUNCHED.
2024-10-22 02:57:32 INFO i.a.w.l.p.h.SuccessHandler(accept):60 - Pipeline completed for workload: fe66c1ea-5fdc-4bc3-8cab-8f39e6291030_16903_1_check.```

I insist on the fact that only certain connections are not synchronized whereas on previous days there were no problems and there were no changes on these connectors, of course!

I’d suggest adding some logging for check_connection method.
What exactly is done during check? Sent request to API, made connection to database, etc.? Are there any retries there?

The other thing is that, I’d check if there are any issues with spawning new pods. I don’t know if you’re using abctl or helm chart + kubernetes

my check_connection function :

        stream = mySpecialStream(config)
        stream.records_limit = 1
        try:
            next(stream.read_records(sync_mode=SyncMode.full_refresh), None)
            return True, None
        except Exception as e:
            return False, e```
There is no retry. Ok for adding logger, il's always a good idea!
I'm using Airbyte with Kubernetes.

On further investigation, I discovered that the “check connection” error always occurred after a connector crash, with the error message “Warning from replication: Something went wrong during replication”.
Copy text
message=‘activity ScheduleToClose timeout’, timeoutType=TIMEOUT_TYPE_SCHEDULE_TO_CLOSE” and with logs displaying
2024-10-22 04:51:11 platform > Workload 2cf86d41-5186-42f2-bbc6-dada499ac9cd_16900_0_sync is pending
for several hours. Another suggestion, perhaps? A Kubernetes pod problem? We’re currently running 40 Airbyte connectors simultaneously, via Airflow.

I hope that stream.read_records(sync_mode=SyncMode.full_refresh) (in next(stream.read_records(sync_mode=SyncMode.full_refresh), None)) is implemented in a way that doesn’t need to read all the records to return single one :wink:

Yeah, it might be a problem with scheduling pods in kubernetes. You need to check what is happening there.