Stumped with building an Incremental stream

I’ve been banging my head against the wall on this, and every time I think I have this figured out I run into a new problem. Hoping someone out there can help.

I’ve used the Python CDK to create a connector to a private API, and now I’d like to implement incremental functionality. I’ve created an IncrementalMixIn class that is generating records, but I’m running into a weird issue. It’ll generate a very small (<100) number of records when in fact the source has ~20K. I think I’ve incorrectly combined page tokens, stream states, and cursor fields which is resulting in:

  • Records are read for a page of data.
  • The stream state is updated to reflect the max cursor value from the cursor field (in my case, updated_at) for the page.
  • The next request gets created with the next_page_token and the cursor value as a filter variable (in my case, updated_at_min).
  • The cycle repeats. Given the above framework though, it only takes a few pages of data to get to the end of the dataset as the data is

Previously, I had the cursor value correctly passing to the stream state but on subsequent runs it would run a full refresh as the request was not taking into account last run’s stream_state.

I’m sure I’m making a simple mistake, but I could use a little assistance in pointing it out.

class BaseStream(HttpStream, ABC):
    url_base = ""
    primary_key = "id"
    limit = 50
    resource_name = ""

    def path(self, **kwargs) -> str:
        return self.resource_name

    def next_page_token(response: requests.Response) -> Optional[Mapping[str, Any]]:
        next_page_token = response.json().get("next_cursor")
        if next_page_token:
            return {"cursor":next_page_token}
            return None

    def request_params(
        self, next_page_token: Mapping[str, Any] = None, **kwargs
    ) -> MutableMapping[str, Any]:
        params = {"limit": self.limit}
        if next_page_token:
        return params

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        response_json = response.json()
        yield from response_json.get(self.resource_name, [])

    def read_slices_from_records(self, stream_class: Type[BaseStream], slice_field: str) -> Iterable[Optional[Mapping[str, Any]]]:
        stream = stream_class(authenticator=self.authenticator)
        stream_slices = stream.stream_slices(sync_mode=SyncMode.full_refresh)
        for stream_slice in stream_slices:
            for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice):
                yield {slice_field: record["access_token"]}

class IncrementalBaseStream(BaseStream, IncrementalMixin):
    cursor_field = "updated_at"
    start_date_filter = "updated_at_min"
    end_date_filter = "updated_at_max"
    min_id = ""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._cursor_value = self.min_id
    def state_checkpoint_interval(self) -> int:
        return super().limit

    def state(self) -> Mapping[str, Any]:
#        return {self.cursor_field: self._cursor_value}
        if self._cursor_value:
            return {self.cursor_field: self._cursor_value}
            return {self.cursor_field: self.min_id}

    def state(self, value: Mapping[str, Any]):
        self._cursor_value = value.get(self.cursor_field)

    def request_params(self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs):
        params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs)
        latest_stream_state = stream_state.get(self.cursor_field)
        unix_latest_stream_state = int(pendulum.parse(latest_stream_state).timestamp()) * 1000
        if not next_page_token:
            if latest_stream_state:
                params[self.start_date_filter] = unix_latest_stream_state
        return params
    def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
        for record in super().read_records(*args, **kwargs):
            if self._cursor_value:
                latest_record_date = record[self.cursor_field]
                self._cursor_value = max(self._cursor_value, latest_record_date)
            yield record
class Connections(BaseStream):
    resource_name = "connections"

class ConnectionsRelatedStream(BaseStream, ABC):    
    def stream_slices(
        self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
    ) -> Iterable[Optional[Mapping[str, Any]]]:
        yield from self.read_slices_from_records(stream_class=Connections, slice_field="access_token")

    def parse_response(self, response: requests.Response, stream_slice: Mapping[str, any] = None, stream_state: Mapping[str, any] = None, **kwargs) -> Iterable[Mapping]:
        response_json = response.json()
        data = response_json.get(self.resource_name, [])
        connection = response_json['connection']
        connection['connection_id'] = connection['id']
        del connection['id']
        for id in data:
        yield from data
class Orders(ConnectionsRelatedStream, IncrementalBaseStream):
    resource_name = "orders"
    min_id = "1900-01-01T00:00:00.00Z"

    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None, **kwargs
    ) -> MutableMapping[str, Any]:
        params = super().request_params(stream_state=self.state, **kwargs)
        stream_params = {"access_token":stream_slice["access_token"], "expand":"transactions"}
        return params

