Incremental Streams

Hello Respected,

I have used HttpSubStream to get id’s from parent stream in the child stream. The problem is when I am receiving the data the dates are in decreasing order. So if i want to implement Incremental-Mixin method how would I implement it because the state would never pick that up.

If somebody helps me out put me the right direction it would be real helpful.

Kind Regards,
Zawar Khan.

Hello @Zawar92, could you share some of your code so I can see how your HttpSubStream class is structured?

class Reports(XYZStream, HttpSubStream):
def __init__(self, authenticator, config, parent: ABC, **kwargs):
        super().__init__(parent=parent, **kwargs)
        self.config = config
        self._authenticator = authenticator
        self._session = requests.sessions.Session()

    def stream_slices(
        self, sync_mode: SyncMode.incremental,
        cursor_field: List[str] = None,
        stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]:

        parent_stream_slices = self.parent.stream_slices(
            sync_mode=SyncMode.incremental,
            cursor_field=cursor_field,
            stream_state=stream_state
        )

        for stream_slice in parent_stream_slices:
            parent_records = self.parent.read_records(
                sync_mode=SyncMode.incremental, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
            )

            for record in parent_records:
                yield {"id": record.get('id')}

    def parse_response(
            self,
            response: requests.Response,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, Any] = None,
            next_page_token: Mapping[str, Any] = None,
        ) -> Iterable[Mapping]:
        if response.json():
            for record in response.json().get('Results'):
                  yield record

    def path(
            self,
            stream_state: Mapping[str, Any] = None,
            stream_slice: Mapping[str, Any] = None,
            next_page_token: Mapping[str, Any] = None
        ) -> str:
        return f"reports/{stream_slice['id']}/ABC

I don’t think there is any issue in code as it is working fine. Problem lies in data which I am fetching from url. Received data have dates in decreasing order. Is there any solution which we can follow.

Which API are you using to develop this connector?

I am using Outbrain-Amplify API.

I see! I’m still brainstorming/researching on how best you could do this, but perhaps a good starting point would be the parse_response function? Maybe you could sort the data there in reverse order?

FYI this is how Meltano deals with “unsorted” streams - I’m looking for a solution to this in Airbyte as well after moving from Meltano for other reasons:

https://sdk.meltano.com/en/latest/implementation/state.html#dealing-with-unsorted-streams

1 Like

@Zawar92 FWIW I found a pretty good implementation that’s about 2 weeks old that does just this. I’m folding this into my connector now.

class SendgridStreamIncrementalMixin(HttpStream, ABC):
    cursor_field = "created"

    def __init__(self, start_time: int, **kwargs):
        super().__init__(**kwargs)
        self._start_time = start_time

    def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
        """
        Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object
        and returning an updated state object.
        """
        latest_benchmark = latest_record[self.cursor_field]
        if current_stream_state.get(self.cursor_field):
            return {self.cursor_field: max(latest_benchmark, current_stream_state[self.cursor_field])}
        return {self.cursor_field: latest_benchmark}

    def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
        params = super().request_params(stream_state=stream_state)
        start_time = self._start_time
        if stream_state.get(self.cursor_field):
            start_time = stream_state[self.cursor_field]
        params.update({"start_time": start_time, "end_time": pendulum.now().int_timestamp})
        return params

I think you just need to install the pendulum dependency and then replace the cursor_field value to whatever field indicates the recency of the record you’re using in your state.

Once I try this. I will give you an update.

Thanks for the tip@pjatx, and looking forward to your update, @Zawar92!

Hi @pjatx. You are using created as cursor_field as it checks for the recency of the record. In my case I actually don’t have anything to check for the recency except for date in the record streams. But the problem is dates object in streams is not in incremental order they are in descending order.

The above example you gave. I already tried implementing it but there isn’t any field checking recency of record except for dates.