How do you correctly modify a connector to adhere to rate limits?

I’m working with a partner API that has a very strict limit of about 120 requests / per minute and I’m trying to build our connector to adhere to that

The reason for this letting it hit exponential backoff repeatedly just increases run time so much that the connector becomes very difficult to test.

My current strategy is to override read_records() and simply insert a sleep() call to force it to pause before sending a request out. Code below:

    def read_records(
        sync_mode: SyncMode,
        cursor_field: List[str] = None,
        stream_slice: Mapping[str, Any] = None,
        stream_state: Mapping[str, Any] = None,
    ) -> Iterable[Mapping[str, Any]]:
        stream_state = stream_state or {}
        pagination_complete = False

        next_page_token = None
        with AirbyteSentry.start_transaction("read_records",, AirbyteSentry.start_transaction_span("read_records"):
            while not pagination_complete:
                request_headers = self.request_headers(
                    stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
                request = self._create_prepared_request(
                    path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
                    headers=dict(request_headers, **self.authenticator.get_auth_header()),
                    params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
                    json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
                    data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
                request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

                sleep(1.75)  # <---This line right here

                if self.use_cache:
                    # use context manager to handle and store cassette metadata
                    with self.cache_file as cass:
                        self.cassete = cass
                        # vcr tries to find records based on the request, if such records exist, return from cache file
                        # else make a request and save record in cache file
                        response = self._send_request(request, request_kwargs)

                    response = self._send_request(request, request_kwargs)
                yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)

                next_page_token = self.next_page_token(response)
                if not next_page_token:
                    pagination_complete = True

            # Always return an empty generator just in case no records were ever yielded
            yield from []

When testing locally via python read ... the connector never receives a 429 or hits exponential backoff, but when run from a locally-hosted Airbyte instance, it hits backoffs regularly. Was my solution too naive? Are there other factors to consider when trying to slow down a connector?

Hey! You are right that in the current CDK approach to rate limiting the connector will first consume the source API without any limit of requests per second and will start pacing the requests after it first receive a 429 error code.
Your approach is not too naive, but I’d suggest you try to override the _send instead of the read_records one. From the _send method you could call the super()._send() and then call sleep. It will probably add less moving parts compared to what you implemented in read_records .

Ok that sounds better than what I’ve got - thank you! I’ll give it a try and see how it goes