Summary
User reports issues with a custom Python destination connector used with a PostgreSQL CDC source connector, encountering retry errors and specific exceptions related to the Airbyte platform.
Question
Hello,
I created python destination connector
Use it with postgresql cdc source connector
It write data, but Data Transfer retry and have cancel with error
first error (attempt 1-2):
message='io.temporal.serviceclient.CheckedExceptionWrapper: io.airbyte.workers.exception.WorkerException: Something went wrong within the airbyte platform', type='java.lang.RuntimeException', nonRetryable=false
second error (attempt 3 - 4)
Cannot invoke "io.airbyte.protocol.models.AirbyteGlobalState.getStreamStates()" because the return value of "io.airbyte.protocol.models.AirbyteStateMessage.getGlobal()" is null
What it can be?
This topic has been created from a Slack thread to give it more visibility.
It will be on Read-Only mode here. Click here if you want
to access the original thread.
Join the conversation on Slack
['python-connector', 'postgresql-cdc', 'data-transfer-error', 'airbyte-platform']
code for destination connector like here
class Destination(Destination):
def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
...
for message in input_messages:
`if message.type == Type.STATE:`
`yield message`
`elif message.type == Type.RECORD:`
`writer.write(str(record.data), timeout=90)`
first error:
message='io.temporal.serviceclient.CheckedExceptionWrapper: io.airbyte.workers.exception.WorkerException: Something went wrong within the airbyte platform', type='java.lang.RuntimeException', nonRetryable=false
see in log
2024-11-06 16:01:12 replication-orchestrator > readFromDestination: done. (writeToDestFailed:false, dest.isFinished:false)
2024-11-06 16:01:12 replication-orchestrator > thread status... timeout thread: false , replication thread: true
2024-11-06 16:01:17 replication-orchestrator > Retry attempt 1 of 5. Last response: null
2024-11-06 16:01:18 replication-orchestrator > Retry attempt 2 of 5. Last response: null
2024-11-06 16:01:20 replication-orchestrator > Retry attempt 3 of 5. Last response: null
2024-11-06 16:01:22 replication-orchestrator > Retry attempt 4 of 5. Last response: null
2024-11-06 16:01:24 replication-orchestrator > Retry attempt 5 of 5. Last response: null
2024-11-06 16:01:24 replication-orchestrator > Retry attempts exceeded.
java.net.ConnectException: Failed to connect to airbyte-abctl-airbyte-server-svc/10.96.164.215:8001
second error (attempt 2 or 3)
Cannot invoke "io.airbyte.protocol.models.AirbyteGlobalState.getStreamStates()" because the return value of "io.airbyte.protocol.models.AirbyteStateMessage.getGlobal()" is null
in log see
2024-11-06 16:32:46 destination > {'type': <type.state:>, 'log': None, 'spec': None, 'connectionStatus': None, 'catalog': None, 'record': None, 'state': {'type': <airbytestatetype.global:>, 'stream': None, 'global_': {'shared_state': {'state': {'["replication_test",{"server":"replication_test"}]': '{"transaction_id":null,"lsn":31618008,"txId":922,"ts_usec":1730910477053028}'}}, 'stream_states': [{'stream_descriptor': {'name': 'source_table', 'namespace': 'airbyte_source'}, 'stream_state': {'stream_name': 'source_table', 'cursor_field': [], 'stream_namespace': 'airbyte_source'}}]}, 'data': None, 'sourceStats': {'recordCount': 2.0}, 'destinationStats': None, 'id': 2}, 'trace': None, 'control': None}
2024-11-06 16:32:46 replication-orchestrator > readFromDestination: exception caught
java.lang.NullPointerException: Cannot invoke "io.airbyte.protocol.models.AirbyteGlobalState.getStreamStates()" because the return value of "io.airbyte.protocol.models.AirbyteStateMessage.getGlobal()" is null
at io.airbyte.workers.internal.bookkeeping.ParallelStreamStatsTracker.updateDestinationStateStats(ParallelStreamStatsTracker.kt:130) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.internal.syncpersistence.SyncPersistenceImpl.updateDestinationStateStats(SyncPersistence.kt:322) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.internal.bookkeeping.AirbyteMessageTracker.acceptFromDestination(AirbyteMessageTracker.kt:65) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.general.ReplicationWorkerHelper.internalProcessMessageFromDestination(ReplicationWorkerHelper.kt:443) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.general.ReplicationWorkerHelper.processMessageFromDestination(ReplicationWorkerHelper.kt:317) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.readFromDestination(BufferedReplicationWorker.java:488) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$2(BufferedReplicationWorker.java:215) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
2024-11-06 16:32:46 replication-orchestrator > readFromDestination: done. (writeToDestFailed:false, dest.isFinished:false)
2024-11-06 16:33:46 replication-orchestrator > Failed to wait for exit value file /dest/exitCode.txt to be found.
java.util.concurrent.TimeoutException: null
at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[?:?]
at io.airbyte.workers.internal.ContainerIOHandle.terminate(ContainerIOHandle.kt:120) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.internal.ContainerIOHandle.terminate$default(ContainerIOHandle.kt:109) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.internal.LocalContainerAirbyteDestination.close(LocalContainerAirbyteDestination.kt:59) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker$CloseableWithTimeout.lambda$close$0(BufferedReplicationWorker.java:543) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithTimeout$5(BufferedReplicationWorker.java:243) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]```
code for destination connector like here
class Destination(Destination):
def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
...
for message in input_messages:
`if message.type == Type.STATE:`
`yield message`
`elif message.type == Type.RECORD:`
`writer.write(str(record.data), timeout=90)`
first error:
message='io.temporal.serviceclient.CheckedExceptionWrapper: io.airbyte.workers.exception.WorkerException: Something went wrong within the airbyte platform', type='java.lang.RuntimeException', nonRetryable=false
see in log
2024-11-06 16:01:12 replication-orchestrator > readFromDestination: done. (writeToDestFailed:false, dest.isFinished:false)
2024-11-06 16:01:12 replication-orchestrator > thread status... timeout thread: false , replication thread: true
2024-11-06 16:01:17 replication-orchestrator > Retry attempt 1 of 5. Last response: null
2024-11-06 16:01:18 replication-orchestrator > Retry attempt 2 of 5. Last response: null
2024-11-06 16:01:20 replication-orchestrator > Retry attempt 3 of 5. Last response: null
2024-11-06 16:01:22 replication-orchestrator > Retry attempt 4 of 5. Last response: null
2024-11-06 16:01:24 replication-orchestrator > Retry attempt 5 of 5. Last response: null
2024-11-06 16:01:24 replication-orchestrator > Retry attempts exceeded.
java.net.ConnectException: Failed to connect to airbyte-abctl-airbyte-server-svc/10.96.164.215:8001
second error (attempt 2 or 3)
Cannot invoke "io.airbyte.protocol.models.AirbyteGlobalState.getStreamStates()" because the return value of "io.airbyte.protocol.models.AirbyteStateMessage.getGlobal()" is null
in log see
2024-11-06 16:32:46 destination > {'type': <type.state:>, 'log': None, 'spec': None, 'connectionStatus': None, 'catalog': None, 'record': None, 'state': {'type': <airbytestatetype.global:>, 'stream': None, 'global_': {'shared_state': {'state': {'["replication_test",{"server":"replication_test"}]': '{"transaction_id":null,"lsn":31618008,"txId":922,"ts_usec":1730910477053028}'}}, 'stream_states': [{'stream_descriptor': {'name': 'source_table', 'namespace': 'airbyte_source'}, 'stream_state': {'stream_name': 'source_table', 'cursor_field': [], 'stream_namespace': 'airbyte_source'}}]}, 'data': None, 'sourceStats': {'recordCount': 2.0}, 'destinationStats': None, 'id': 2}, 'trace': None, 'control': None}
2024-11-06 16:32:46 replication-orchestrator > readFromDestination: exception caught
java.lang.NullPointerException: Cannot invoke "io.airbyte.protocol.models.AirbyteGlobalState.getStreamStates()" because the return value of "io.airbyte.protocol.models.AirbyteStateMessage.getGlobal()" is null
at io.airbyte.workers.internal.bookkeeping.ParallelStreamStatsTracker.updateDestinationStateStats(ParallelStreamStatsTracker.kt:130) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.internal.syncpersistence.SyncPersistenceImpl.updateDestinationStateStats(SyncPersistence.kt:322) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.internal.bookkeeping.AirbyteMessageTracker.acceptFromDestination(AirbyteMessageTracker.kt:65) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.general.ReplicationWorkerHelper.internalProcessMessageFromDestination(ReplicationWorkerHelper.kt:443) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.general.ReplicationWorkerHelper.processMessageFromDestination(ReplicationWorkerHelper.kt:317) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.readFromDestination(BufferedReplicationWorker.java:488) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$2(BufferedReplicationWorker.java:215) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
2024-11-06 16:32:46 replication-orchestrator > readFromDestination: done. (writeToDestFailed:false, dest.isFinished:false)
2024-11-06 16:33:46 replication-orchestrator > Failed to wait for exit value file /dest/exitCode.txt to be found.
java.util.concurrent.TimeoutException: null
at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[?:?]
at io.airbyte.workers.internal.ContainerIOHandle.terminate(ContainerIOHandle.kt:120) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.internal.ContainerIOHandle.terminate$default(ContainerIOHandle.kt:109) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.internal.LocalContainerAirbyteDestination.close(LocalContainerAirbyteDestination.kt:59) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker$CloseableWithTimeout.lambda$close$0(BufferedReplicationWorker.java:543) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithTimeout$5(BufferedReplicationWorker.java:243) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]```