Error with Python Destination Connector and Postgresql CDC Source Connector

Summary

Python destination connector encountering errors with data transfer retry and cancellation, including ‘io.airbyte.workers.exception.WorkerException’ and ‘io.airbyte.protocol.models.AirbyteGlobalState.getStreamStates()’ null value.


Question

Hello,

I created python destination connector

Use it with postgresql cdc source connector

It write data, but Data Transfer retry and have cancel with error

first error (attempt 1-2):
message='io.temporal.serviceclient.CheckedExceptionWrapper: io.airbyte.workers.exception.WorkerException: Something went wrong within the airbyte platform', type='java.lang.RuntimeException', nonRetryable=false

second error (attempt 3 - 4)
Cannot invoke "io.airbyte.protocol.models.AirbyteGlobalState.getStreamStates()" because the return value of "io.airbyte.protocol.models.AirbyteStateMessage.getGlobal()" is null

What it can be?



This topic has been created from a Slack thread to give it more visibility.
It will be on Read-Only mode here. Click here if you want
to access the original thread.

Join the conversation on Slack

["python-destination-connector", "postgresql-cdc-source-connector", "data-transfer", "error", "airbyte-platform"]

code for destination connector like here

class Destination(Destination):
def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:

...
for message in input_messages:

`if message.type == Type.STATE:`   
  `yield message`
`elif message.type == Type.RECORD:`
    `writer.write(str(record.data), timeout=90)`

first error:

message='io.temporal.serviceclient.CheckedExceptionWrapper: io.airbyte.workers.exception.WorkerException: Something went wrong within the airbyte platform', type='java.lang.RuntimeException', nonRetryable=false

see in log

2024-11-06 16:01:12 replication-orchestrator > readFromDestination: done. (writeToDestFailed:false, dest.isFinished:false)
2024-11-06 16:01:12 replication-orchestrator > thread status... timeout thread: false , replication thread: true
2024-11-06 16:01:17 replication-orchestrator > Retry attempt 1 of 5. Last response: null
2024-11-06 16:01:18 replication-orchestrator > Retry attempt 2 of 5. Last response: null
2024-11-06 16:01:20 replication-orchestrator > Retry attempt 3 of 5. Last response: null
2024-11-06 16:01:22 replication-orchestrator > Retry attempt 4 of 5. Last response: null
2024-11-06 16:01:24 replication-orchestrator > Retry attempt 5 of 5. Last response: null
2024-11-06 16:01:24 replication-orchestrator > Retry attempts exceeded.
java.net.ConnectException: Failed to connect to airbyte-abctl-airbyte-server-svc/10.96.164.215:8001

second error (attempt 2 or 3)

Cannot invoke "io.airbyte.protocol.models.AirbyteGlobalState.getStreamStates()" because the return value of "io.airbyte.protocol.models.AirbyteStateMessage.getGlobal()" is null

in log see

2024-11-06 16:32:46 destination > {'type': <type.state:>, 'log': None, 'spec': None, 'connectionStatus': None, 'catalog': None, 'record': None, 'state': {'type': <airbytestatetype.global:>, 'stream': None, 'global_': {'shared_state': {'state': {'["replication_test",{"server":"replication_test"}]': '{"transaction_id":null,"lsn":31618008,"txId":922,"ts_usec":1730910477053028}'}}, 'stream_states': [{'stream_descriptor': {'name': 'source_table', 'namespace': 'airbyte_source'}, 'stream_state': {'stream_name': 'source_table', 'cursor_field': [], 'stream_namespace': 'airbyte_source'}}]}, 'data': None, 'sourceStats': {'recordCount': 2.0}, 'destinationStats': None, 'id': 2}, 'trace': None, 'control': None}
2024-11-06 16:32:46 replication-orchestrator > readFromDestination: exception caught
java.lang.NullPointerException: Cannot invoke "io.airbyte.protocol.models.AirbyteGlobalState.getStreamStates()" because the return value of "io.airbyte.protocol.models.AirbyteStateMessage.getGlobal()" is null
        at io.airbyte.workers.internal.bookkeeping.ParallelStreamStatsTracker.updateDestinationStateStats(ParallelStreamStatsTracker.kt:130) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
        at io.airbyte.workers.internal.syncpersistence.SyncPersistenceImpl.updateDestinationStateStats(SyncPersistence.kt:322) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
        at io.airbyte.workers.internal.bookkeeping.AirbyteMessageTracker.acceptFromDestination(AirbyteMessageTracker.kt:65) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
        at io.airbyte.workers.general.ReplicationWorkerHelper.internalProcessMessageFromDestination(ReplicationWorkerHelper.kt:443) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
        at io.airbyte.workers.general.ReplicationWorkerHelper.processMessageFromDestination(ReplicationWorkerHelper.kt:317) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
        at io.airbyte.workers.general.BufferedReplicationWorker.readFromDestination(BufferedReplicationWorker.java:488) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
        at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$2(BufferedReplicationWorker.java:215) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
        at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
        at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
2024-11-06 16:32:46 replication-orchestrator > readFromDestination: done. (writeToDestFailed:false, dest.isFinished:false)
2024-11-06 16:33:46 replication-orchestrator > Failed to wait for exit value file /dest/exitCode.txt to be found.
java.util.concurrent.TimeoutException: null
        at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[?:?]
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[?:?]
        at io.airbyte.workers.internal.ContainerIOHandle.terminate(ContainerIOHandle.kt:120) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
        at io.airbyte.workers.internal.ContainerIOHandle.terminate$default(ContainerIOHandle.kt:109) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
        at io.airbyte.workers.internal.LocalContainerAirbyteDestination.close(LocalContainerAirbyteDestination.kt:59) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
        at io.airbyte.workers.general.BufferedReplicationWorker$CloseableWithTimeout.lambda$close$0(BufferedReplicationWorker.java:543) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
        at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithTimeout$5(BufferedReplicationWorker.java:243) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
        at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
        at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]```

https://github.com/airbytehq/airbyte/issues/38749#issuecomment-2460368973|it help me