Source: Incremental sync using Sub Streams

Hi all,

I am currently working on a source connector using the Python API CDK. I have Full Sync working perfectly, but now want to implement Incremental Sync with an endpoint that relies on Sub Streams (via HttpSubStream) - for purposes of the question, I want to get tasks, which need to be polled using the project (project_id) they belong in.

The logic that will need to be applied is that the state (which holds the last_updated value) will need to used to query the tasks within each project, as each project could have new tasks since the last sync and there is no way to call across project. Finally, after all projects have been iterated through, the state needs to be updated with the most recent update to a task.

Logic in pseudocode:

# get the last updated state
current_state = get_last_updated(s)

# go through all projects and get the tasks
get_tasks(project1, last_updated=current_state)
get_tasks(project2, last_updated=current_state)
...
get_tasks(projectN, last_updated=current_state)

# update the state with the most recent updated task time
set_state(most_recent_updated)

I currently have it working using the HttpSubStream class and an Incremental mixin, but I am not sure where to update the final state? Doing this in read_records updates the state after each project (stream), rather than right at the end of all project, therefore any new tasks in project1, might mean missing “older” tasks in project2.

Please point me in the direction of some documentation or an example of a source connector that does this.

Thank you in advance!

OllieF

Hi @OllieF, let me look into this a bit more, but for now I thought this doc might be useful:
https://airbytehq.github.io/connector-development/cdk-python/incremental-stream/#checkpointing-state

Are you currently using streamslices?

Hi @natalyjazzviolin, thank you for picking this up!

Yes, I am currently using stream slicing, primarily in the path function as shown below:

    def path(self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None) -> str:
        data_obj = f"get_tasks/{stream_slice['parent']['project_id']}&updated_after={self.state.get(self.cursor_field)}"
        if next_page_token:
            data_obj += f"&limit={next_page_token.get('limit')}&offset={next_page_token.get('offset')}"
        return f"{self.base_url}{data_obj}"

This works, and it is able to go through all project streams, the issue is trying to identify where the state would be updated. Updating the state as shown below would mean that there would be a new state for each slice, rather than using the original state for each slice and then updating the state only right at the end, as I would want it.

    def read_records(
        self,
        sync_mode: SyncMode,
        cursor_field: List[str] = None,
        stream_slice: Mapping[str, Any] = None,
        stream_state: Mapping[str, Any] = None,
    ) -> Iterable[Mapping[str, Any]]:
        for record in super().read_records(
            sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
        ):
            yield record
        self._cursor_value = max(record[self.cursor_field], self._cursor_value)

Thank you again,

OllieF

Hi @natalyjazzviolin, have you been able to look into this any further?

Thank you,

OllieF

Hi Oliver, I apologize for the wait - we are a small team and have been getting a larger than normal amount of messages over the past few weeks. This is on my radar, but I have not been able to look into this yet!