Incremental Sync for basic Python source connector

I am trying to create a custom Source Connector with Python. I am not using HTTP API. I am using basic Python Source connector. I am having hard time understanding how to deal with incremental syncs. I have defined two sync modes, added a cursor field. But I think I am missing state stuff. It is unclear from the documentation how to deal with it. I have multiple data streams, that I loop through in read_records method and yield records back. For most streams, I have updated_at field as a cursor field.

It would be nice if someone can help me figure out how to develop incremental sync. I saw a few examples for HTTP API but I am not using that.

Thanks.

Hello @yzislin the code from your connector is in Github where we can review it?

Hi. No it is not in github.
I dont really have a mechanism for incremental sync. That’s what I am trying to learn.

I am just running some script and taking its json output and return each record in a for loop.
record=AirbyteRecordMessage(stream=stream, data=one_record, emitted_at=int(datetime.now().timestamp()) * 1000)
yield AirbyteMessage(type=Type.RECORD, record=record)

I see that read_records has state parameter. Do I need to append a state object (ie some field from the data like “updated_at”) to the AirbyteRecordMessage?

I just cant seem to find any documentation that talks about it. Everything I see is broad or HTTP API stream specific. Mine is a basic python source connector.

Take a look how the Source Faker is built: https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-faker/source_faker/source.py
If you want to learn about the incremental method see our docs: https://docs.airbyte.com/connector-development/cdk-python/incremental-stream
and search in the code base to see how sources implement their incremental step.

Let me know if you need further assistance

Great example (source faker). Very useful to understand how it works. One more question. So my stream of data is not sorted. It does have date timestamp or an ID but it is not sorted. If I understood correctly state message only works if cursor field is sorted in ascending order. Is my assumption correct?

I did look through cdk-python incremental-stream guide. It seems my assumption correct as the state seems to require ordered list of events.

Due to inability to have sorted data, I was thinking to create a unique (md5sum hash of record) field, which would be unique for each record. Then I will create a destination connector where data would be written to disk (ie json lines). I would then make sure that these files do not contain entries with incoming unique hashes. What do you think about this approach?

If I understood correctly state message only works if cursor field is sorted in ascending order. Is my assumption correct?

You’re correct.

Due to inability to have sorted data, I was thinking to create a unique (md5sum hash of record) field, which would be unique for each record. Then I will create a destination connector where data would be written to disk (ie json lines). I would then make sure that these files do not contain entries with incoming unique hashes. What do you think about this approach?

You have a lot of data to sync? Because of this do you want to have incremental? To be honest, the cost of checking in the destination if the record exists I don’t think it’s be best.

Let’s do a little exercise:
You’re trying to sync orders from the API

[{"order_id": 1, "amount": 100, "created_at": "2021-01-01T00:00:00Z", "updated_at": "2021-01-01T00:00:00Z"}, {"order_id": 2, "amount": 30, "created_at": "2021-10-01T00:00:00Z", "updated_at": "2021-10-01T00:00:00Z"},]

the first sync will execute and you’re going to save the state of the stream as update_at = 2021-10-01T00:00:00Z
when doing the second sync your data at source is:

[{"order_id": 1, "amount": 150, "created_at": "2021-01-01T00:00:00Z", "updated_at": "2021-11-01T00:00:00Z"}, {"order_id": 2, "amount": 30, "created_at": "2021-10-01T00:00:00Z", "updated_at": "2021-10-01T00:00:00Z"},]

so the second sync will capture the update from the first record, because the call to the API will be something like:
api-example.com/orders?update_at>=2021-10-01T00:00:Z

Trying to mock this using md5 hash I dont think you’re going to achieve that.
Hope this help you, for you knowledge there are some streams from sources doesn’t support incremental because of their API design and Airbyte only do full refresh.

So your article about source-faker gave me a clear picture on how states work in source collector. So I am implementing that. I want to do incremental stream, so I dont have duplicates in the destination. I have figured out a way to sort my incoming data so state field will work for me. So the logic for getting the stream is:

  • get cursor value from state of the stream
  • get data and sort it by cursor_field
  • only yield data that is greater that cursor value.
  • and finally yield state with the latest cursor value.

I have figured out how to extract sync mode selected from the catalog. Good on that front.
The logic above is for incremental mode. If I choose Full, I just have to set cursor to 0, so the logic will get all events again.

Sorry I keep jumping around with my ideas. Thanks for the help.

oh, and is there only one AirbyteStateMessage per sync? so if I have multiple streams, I cannot maintain separate state? I have to only have one state object per whole sync. Is that right?

No you can emit multiple states messages and have state to each stream. If you emit the state message the Airbyte worker will understand to save it internally.

For some reason the state for last stream was being used on all others. But I got it worked out. I also needed a sub-state (ie multiple states within the stream). So I’ve created a big json object with all states. Seems to work just fine.

Thanks for your help BTW.

P.S. I made a Github App source connector. Unfortunately it is not pure Python. Python calls NodeJS script to retrieve data since Github API has a well known Node JS module that does everything for you.

Amazing yzislin! If possible share the code here to people used in the future too!

Hi there from the Community Assistance team.
We’re letting you know about an issue we discovered with the back-end process we use to handle topics and responses on the forum. If you experienced a situation where you posted the last message in a topic that did not receive any further replies, please open a new topic to continue the discussion. In addition, if you’re having a problem and find a closed topic on the subject, go ahead and open a new topic on it and we’ll follow up with you. We apologize for the inconvenience, and appreciate your willingness to work with us to provide a supportive community.