Handling of state in Python CDK connectors in different Airbyte versions

Summary

The user is experiencing issues with handling state in Python CDK connectors between different Airbyte versions. The older connector is not outputting ‘state’ in Airbyte version 0.40.32, causing problems with incremental loads.


Question

Anyone know for python cdk connectors if there’s major differences in how state or state message is handled in newest vs older airbyte versions? I have an older python connector that works OK on my newest airbyte server, but the production server is on 0.40.32 and I find that it can’t do incremental loads because “state” is never output from the connector.

I think it’s this change I made to streams.py, which I made in order to actually fix an error I was getting in the newer airbyte, but it seems to have broken state somehow in older airbyte. Unless it’s something totally unrelated.

Open to ideas.

streams.py code change



This topic has been created from a Slack thread to give it more visibility.
It will be on Read-Only mode here. Click here if you want
to access the original thread.

Join the conversation on Slack

["python-cdk-connectors", "state-handling", "airbyte-versions", "incremental-loads", "streams.py"]

Testing from VS Code at least, this seems to straighten out the behavior. Testing today in our ‘real’ airbyte server though which is on 0.40.32.

Without the else statement in state.setter, it was just crashing the sync. But curiously that was never there before, and only seems problematic on airbyte 0.50+ since it was running fine the previous way for a couple years on 0.40.32.

Nope, still it does not output a state on 0.40.32. But I see a ERROR messages in the logs that are helping to zero in on the issue.

2024-05-30 20:37:24 [32mINFO[m i.a.v.j.JsonSchemaValidator(test):130 - JSON schema validation failed.
errors: $.state.stream.stream_descriptor.namespace: null found, string expected

This seems to have worked (see vs code screenshot). I can’t find the previous versions of the protocol schema but it must have required namespace in the past in the version running on this older server. I now get state showing in the connector advanced settings after I run it.

Basically I just set state manually to some string value in my Stream class like this:

class FTPStream(Stream, IncrementalMixin):
primary_key = None
cursor_field = “last_modified”
namespace = “sftp-source” #Ensure is not null, appears required for backwards compatibility

Strangely though, I still see this in the logs. Yet it works.

2024-05-30 21:47:54 ERROR i.a.w.i.DefaultAirbyteStreamFactory(validate):134 - Validation failed: {“type”:“TRACE”,“trace”:{“type”:“STREAM_STATUS”,“emitted_at”:1.71710567456201E12,“stream_status”:{“stream_descriptor”:{“name”:“ejpress”,“namespace”:null},“status”:“STARTED”}}}