I’ve been banging my head against the wall on this, and every time I think I have this figured out I run into a new problem. Hoping someone out there can help.
I’ve used the Python CDK to create a connector to a private API, and now I’d like to implement incremental functionality. I’ve created an IncrementalMixIn
class that is generating records, but I’m running into a weird issue. It’ll generate a very small (<100) number of records when in fact the source has ~20K. I think I’ve incorrectly combined page tokens, stream states, and cursor fields which is resulting in:
- Records are read for a page of data.
- The stream state is updated to reflect the max cursor value from the cursor field (in my case,
updated_at
) for the page. - The next request gets created with the
next_page_token
and the cursor value as a filter variable (in my case,updated_at_min
). - The cycle repeats. Given the above framework though, it only takes a few pages of data to get to the end of the dataset as the data is
Previously, I had the cursor value correctly passing to the stream state but on subsequent runs it would run a full refresh as the request was not taking into account last run’s stream_state
.
I’m sure I’m making a simple mistake, but I could use a little assistance in pointing it out.
class BaseStream(HttpStream, ABC):
url_base = "https://base.api.com/"
primary_key = "id"
limit = 50
resource_name = ""
def path(self, **kwargs) -> str:
return self.resource_name
@staticmethod
def next_page_token(response: requests.Response) -> Optional[Mapping[str, Any]]:
next_page_token = response.json().get("next_cursor")
if next_page_token:
return {"cursor":next_page_token}
else:
return None
def request_params(
self, next_page_token: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
params = {"limit": self.limit}
if next_page_token:
params.update(next_page_token)
return params
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
yield from response_json.get(self.resource_name, [])
def read_slices_from_records(self, stream_class: Type[BaseStream], slice_field: str) -> Iterable[Optional[Mapping[str, Any]]]:
stream = stream_class(authenticator=self.authenticator)
stream_slices = stream.stream_slices(sync_mode=SyncMode.full_refresh)
for stream_slice in stream_slices:
for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice):
yield {slice_field: record["access_token"]}
class IncrementalBaseStream(BaseStream, IncrementalMixin):
cursor_field = "updated_at"
start_date_filter = "updated_at_min"
end_date_filter = "updated_at_max"
min_id = ""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._cursor_value = self.min_id
@property
def state_checkpoint_interval(self) -> int:
return super().limit
@property
def state(self) -> Mapping[str, Any]:
# return {self.cursor_field: self._cursor_value}
if self._cursor_value:
return {self.cursor_field: self._cursor_value}
else:
return {self.cursor_field: self.min_id}
@state.setter
def state(self, value: Mapping[str, Any]):
self._cursor_value = value.get(self.cursor_field)
def request_params(self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs):
params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs)
latest_stream_state = stream_state.get(self.cursor_field)
unix_latest_stream_state = int(pendulum.parse(latest_stream_state).timestamp()) * 1000
if not next_page_token:
if latest_stream_state:
params[self.start_date_filter] = unix_latest_stream_state
return params
def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
for record in super().read_records(*args, **kwargs):
if self._cursor_value:
latest_record_date = record[self.cursor_field]
self._cursor_value = max(self._cursor_value, latest_record_date)
yield record
class Connections(BaseStream):
resource_name = "connections"
class ConnectionsRelatedStream(BaseStream, ABC):
def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
yield from self.read_slices_from_records(stream_class=Connections, slice_field="access_token")
def parse_response(self, response: requests.Response, stream_slice: Mapping[str, any] = None, stream_state: Mapping[str, any] = None, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
data = response_json.get(self.resource_name, [])
connection = response_json['connection']
connection['connection_id'] = connection['id']
del connection['id']
for id in data:
id.update(connection)
yield from data
class Orders(ConnectionsRelatedStream, IncrementalBaseStream):
resource_name = "orders"
min_id = "1900-01-01T00:00:00.00Z"
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=self.state, **kwargs)
stream_params = {"access_token":stream_slice["access_token"], "expand":"transactions"}
params.update(stream_params)
return params