I’m working with a partner API that has a very strict limit of about 120 requests / per minute and I’m trying to build our connector to adhere to that
The reason for this letting it hit exponential backoff repeatedly just increases run time so much that the connector becomes very difficult to test.
My current strategy is to override read_records()
and simply insert a sleep()
call to force it to pause before sending a request out. Code below:
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
pagination_complete = False
next_page_token = None
with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"):
while not pagination_complete:
request_headers = self.request_headers(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)
request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
sleep(1.75) # <---This line right here
if self.use_cache:
# use context manager to handle and store cassette metadata
with self.cache_file as cass:
self.cassete = cass
# vcr tries to find records based on the request, if such records exist, return from cache file
# else make a request and save record in cache file
response = self._send_request(request, request_kwargs)
else:
response = self._send_request(request, request_kwargs)
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
next_page_token = self.next_page_token(response)
if not next_page_token:
pagination_complete = True
# Always return an empty generator just in case no records were ever yielded
yield from []
When testing locally via python main.py read ...
the connector never receives a 429 or hits exponential backoff, but when run from a locally-hosted Airbyte instance, it hits backoffs regularly. Was my solution too naive? Are there other factors to consider when trying to slow down a connector?