Stumped with building an Incremental stream

I’ve been banging my head against the wall on this, and every time I think I have this figured out I run into a new problem. Hoping someone out there can help.

I’ve used the Python CDK to create a connector to a private API, and now I’d like to implement incremental functionality. I’ve created an IncrementalMixIn class that is generating records, but I’m running into a weird issue. It’ll generate a very small (<100) number of records when in fact the source has ~20K. I think I’ve incorrectly combined page tokens, stream states, and cursor fields which is resulting in:

  • Records are read for a page of data.
  • The stream state is updated to reflect the max cursor value from the cursor field (in my case, updated_at) for the page.
  • The next request gets created with the next_page_token and the cursor value as a filter variable (in my case, updated_at_min).
  • The cycle repeats. Given the above framework though, it only takes a few pages of data to get to the end of the dataset as the data is

Previously, I had the cursor value correctly passing to the stream state but on subsequent runs it would run a full refresh as the request was not taking into account last run’s stream_state.

I’m sure I’m making a simple mistake, but I could use a little assistance in pointing it out.

class BaseStream(HttpStream, ABC):
    url_base = "https://base.api.com/"
    primary_key = "id"
    limit = 50
    resource_name = ""

    def path(self, **kwargs) -> str:
        return self.resource_name

    @staticmethod
    def next_page_token(response: requests.Response) -> Optional[Mapping[str, Any]]:
        next_page_token = response.json().get("next_cursor")
        if next_page_token:
            return {"cursor":next_page_token}
        else:
            return None

    def request_params(
        self, next_page_token: Mapping[str, Any] = None, **kwargs
    ) -> MutableMapping[str, Any]:
        params = {"limit": self.limit}
        if next_page_token:
            params.update(next_page_token)
        return params

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        response_json = response.json()
        yield from response_json.get(self.resource_name, [])

    def read_slices_from_records(self, stream_class: Type[BaseStream], slice_field: str) -> Iterable[Optional[Mapping[str, Any]]]:
        stream = stream_class(authenticator=self.authenticator)
        stream_slices = stream.stream_slices(sync_mode=SyncMode.full_refresh)
        for stream_slice in stream_slices:
            for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice):
                yield {slice_field: record["access_token"]}

class IncrementalBaseStream(BaseStream, IncrementalMixin):
    cursor_field = "updated_at"
    start_date_filter = "updated_at_min"
    end_date_filter = "updated_at_max"
    min_id = ""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._cursor_value = self.min_id
        
    @property
    def state_checkpoint_interval(self) -> int:
        return super().limit

    @property
    def state(self) -> Mapping[str, Any]:
#        return {self.cursor_field: self._cursor_value}
        if self._cursor_value:
            return {self.cursor_field: self._cursor_value}
        else:
            return {self.cursor_field: self.min_id}

    @state.setter
    def state(self, value: Mapping[str, Any]):
        self._cursor_value = value.get(self.cursor_field)

    def request_params(self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs):
        params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs)
        latest_stream_state = stream_state.get(self.cursor_field)
        unix_latest_stream_state = int(pendulum.parse(latest_stream_state).timestamp()) * 1000
        if not next_page_token:
            if latest_stream_state:
                params[self.start_date_filter] = unix_latest_stream_state
        return params
    
    def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
        for record in super().read_records(*args, **kwargs):
            if self._cursor_value:
                latest_record_date = record[self.cursor_field]
                self._cursor_value = max(self._cursor_value, latest_record_date)
            yield record
class Connections(BaseStream):
    resource_name = "connections"

class ConnectionsRelatedStream(BaseStream, ABC):    
    def stream_slices(
        self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
    ) -> Iterable[Optional[Mapping[str, Any]]]:
        yield from self.read_slices_from_records(stream_class=Connections, slice_field="access_token")

    def parse_response(self, response: requests.Response, stream_slice: Mapping[str, any] = None, stream_state: Mapping[str, any] = None, **kwargs) -> Iterable[Mapping]:
        response_json = response.json()
        data = response_json.get(self.resource_name, [])
        connection = response_json['connection']
        connection['connection_id'] = connection['id']
        del connection['id']
        for id in data:
            id.update(connection)
        yield from data
class Orders(ConnectionsRelatedStream, IncrementalBaseStream):
    resource_name = "orders"
    min_id = "1900-01-01T00:00:00.00Z"

    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None, **kwargs
    ) -> MutableMapping[str, Any]:
        params = super().request_params(stream_state=self.state, **kwargs)
        stream_params = {"access_token":stream_slice["access_token"], "expand":"transactions"}
        params.update(stream_params)
        return params

Hello there! You are receiving this message because none of your fellow community members has stepped in to respond to your topic post. (If you are a community member and you are reading this response, feel free to jump in if you have the answer!) As a result, the Community Assistance Team has been made aware of this topic and will be investigating and responding as quickly as possible.
Some important considerations that will help your to get your issue solved faster:

  • It is best to use our topic creation template; if you haven’t yet, we recommend posting a followup with the requested information. With that information the team will be able to more quickly search for similar issues with connectors and the platform and troubleshoot more quickly your specific question or problem.
  • Make sure to upload the complete log file; a common investigation roadblock is that sometimes the error for the issue happens well before the problem is surfaced to the user, and so having the tail of the log is less useful than having the whole log to scan through.
  • Be as descriptive and specific as possible; when investigating it is extremely valuable to know what steps were taken to encounter the issue, what version of connector / platform / Java / Python / docker / k8s was used, etc. The more context supplied, the quicker the investigation can start on your topic and the faster we can drive towards an answer.
  • We in the Community Assistance Team are glad you’ve made yourself part of our community, and we’ll do our best to answer your questions and resolve the problems as quickly as possible. Expect to hear from a specific team member as soon as possible.

Thank you for your time and attention.
Best,
The Community Assistance Team