How the Airbyte CDK read command works

Startinng by the the Entrypoint.py file in Airbyte CDK

def read(self, source_spec: ConnectorSpecification, config: TConfig, catalog: TCatalog, state: TState) -> Iterable[AirbyteMessage]:
        self.set_up_secret_filter(config, source_spec.connectionSpecification)
        if self.source.check_config_against_spec:
            self.validate_connection(source_spec, config)

        yield from self.source.read(self.logger, config, catalog, state)
  1. Similar to the check function first the read will mask any secrets
  2. Validate if the current config is compatible with the spec version of the connector
  3. Calls the read function from the AbstractSource class

The read function in the AbstractSource is quite big one.

def read(
    self,
    logger: logging.Logger,
    config: Mapping[str, Any],
    catalog: ConfiguredAirbyteCatalog,
    state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None,
) -> Iterator[AirbyteMessage]:
    """Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/."""
    logger.info(f"Starting syncing {self.name}")
		config, internal_config = split_config(config)
       
		stream_instances = {s.name: s for s in self.streams(config)}
    state_manager = ConnectorStateManager(stream_instance_map=stream_instances, state=state)
    self._stream_to_instance_map = stream_instances
		with create_timer(self.name) as timer:
	    for configured_stream in catalog.streams:
			    stream_instance = stream_instances.get(configured_stream.stream.name)
          if not stream_instance:
		          raise KeyError(
		              f"The requested stream {configured_stream.stream.name} was not found in the source."
                  f" Available streams: {stream_instances.keys()}"
              )
          stream_is_available, error = stream_instance.check_availability(logger, self)
          if not stream_is_available:
			        logger.warning(f"Skipped syncing stream '{stream_instance.name}' because it was unavailable. Error: {error}")
              continue
          try:
		          timer.start_event(f"Syncing stream {configured_stream.stream.name}")
              logger.info(f"Marking stream {configured_stream.stream.name} as STARTED")
              yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.STARTED)
              yield from self._read_stream(
									logger=logger,
                  stream_instance=stream_instance,
                  configured_stream=configured_stream,
                  state_manager=state_manager,
                  internal_config=internal_config,
               )
               logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
			         yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.COMPLETE)
           except AirbyteTracedException as e:
							 yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.INCOMPLETE)
               raise e
           except Exception as e:
               logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
               logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
               yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.INCOMPLETE)
               display_message = stream_instance.get_error_display_message(e)
							 if display_message:
		               raise AirbyteTracedException.from_exception(e, message=display_message) from e
               raise e
           finally:
               timer.finish_event()
               logger.info(f"Finished syncing {configured_stream.stream.name}")
               logger.info(timer.report())

		logger.info(f"Finished syncing {self.name}")
  1. Similar to the discover function it will call the streams function to retrieve all streams but now created a dictionary to easily map the stream class from their name
  2. Start the ConnectorStateManager which will handle all state messages
  3. Start a timer to count how much time each stream takes to finish
  4. For each ConfiguredStream (the selected stream by the user in the UI during the connection creation)
  5. Check stream availability
    1. in most cases today it will use the HttpAvailabilityStrategy class which checks the first record
  6. Output the status STARTED
  7. Yield (print) the output from the iterator (list) from the function _read_stream
    1. The function first check if the configured stream is incremental or full_refresh for each case will call a different function to read the records.
def _read_stream(
        self,
        logger: logging.Logger,
        stream_instance: Stream,
        configured_stream: ConfiguredAirbyteStream,
        state_manager: ConnectorStateManager,
        internal_config: InternalConfig,
    ) -> Iterator[AirbyteMessage]:
        self._apply_log_level_to_stream_logger(logger, stream_instance)
        if internal_config.page_size and isinstance(stream_instance, HttpStream):
            logger.info(f"Setting page size for {stream_instance.name} to {internal_config.page_size}")
            stream_instance.page_size = internal_config.page_size
        logger.debug(
            f"Syncing configured stream: {configured_stream.stream.name}",
            extra={
                "sync_mode": configured_stream.sync_mode,
                "primary_key": configured_stream.primary_key,
                "cursor_field": configured_stream.cursor_field,
            },
        )
        logger.debug(
            f"Syncing stream instance: {stream_instance.name}",
            extra={
                "primary_key": stream_instance.primary_key,
                "cursor_field": stream_instance.cursor_field,
            },
        )

        use_incremental = configured_stream.sync_mode == SyncMode.incremental and stream_instance.supports_incremental
        if use_incremental:
            record_iterator = self._read_incremental(
                logger,
                stream_instance,
                configured_stream,
                state_manager,
                internal_config,
            )
        else:
            record_iterator = self._read_full_refresh(logger, stream_instance, configured_stream, internal_config)

        record_counter = 0
        stream_name = configured_stream.stream.name
        logger.info(f"Syncing stream: {stream_name} ")
        for record in record_iterator:
            if record.type == MessageType.RECORD:
                record_counter += 1
                if record_counter == 1:
                    logger.info(f"Marking stream {stream_name} as RUNNING")
                    # If we just read the first record of the stream, emit the transition to the RUNNING state
                    yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.RUNNING)
            yield record

        logger.info(f"Read {record_counter} records from {stream_name} stream")

Full Refresh

Breaking down the full_refresh method

record_iterator = self._read_full_refresh(
		logger, 
		stream_instance, 
		configured_stream, 
		internal_config
)
def _read_full_refresh(
        self,
        logger: logging.Logger,
        stream_instance: Stream,
        configured_stream: ConfiguredAirbyteStream,
        internal_config: InternalConfig,
    ) -> Iterator[AirbyteMessage]:
        slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field)
        logger.debug(
            f"Processing stream slices for {configured_stream.stream.name} (sync_mode: full_refresh)", extra={"stream_slices": slices}
        )
        total_records_counter = 0
        for _slice in slices:
            if self.should_log_slice_message(logger):
                yield AirbyteMessage(
                    type=MessageType.LOG,
                    log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"),
                )
            record_data_or_messages = stream_instance.read_records(
                stream_slice=_slice,
                sync_mode=SyncMode.full_refresh,
                cursor_field=configured_stream.cursor_field,
            )
            for record_data_or_message in record_data_or_messages:
                message = self._get_message(record_data_or_message, stream_instance)
                yield message
                if message.type == MessageType.RECORD:
                    total_records_counter += 1
                    if self._limit_reached(internal_config, total_records_counter):
                        return

The first step is to call the stream_slices. This concept is somewhat complex and can mislead users. Let’s use one example to make it easier:

In our example our service has two endpoints Orders and OrderDetails

The Orders return only the list of orders without any details


"data" : [
	{"order_id": 1, "path": "/order_details/1"},
	{"order_id": 2, "path": "/order_details/2"},
	{"order_id": 3, "path": "/order_details/3"}
]

And whatever we access the OrderDetails with the order_details/2

"data": {
	"order_id": 2,
	"created_at": "2023-01-01",
  "total_value": 1000
}

This can be translated as

class Orders(Stream):

	use_cache = True
	
	def path(self, stream_slice: Mapping[str, Any], **kwargs) -> str:
			return "orders/"

class OrderDetails(Stream):

	def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
			all_orders = Orders(config).read_records(SyncMode.full_refresh)
			for record in all_orders:
					yield {"order_id": record["order_id"]}

	def path(self, stream_slice: Mapping[str, Any], **kwargs) -> str:
			return f"order_details/{stream_slice['order_id']}"

Another situations where you can use stream_slices are:

  • You can create a spec parameter to iterate or filter the records you want to retrieve
    • Let’s say the API has a path to each status canceled completed pending so you can create a parameter where the user has the option to retrieve records from each of them.
  • Sometimes the API allow you to request using date parameters and using the stream slice you can break the request into smaller chunks.

Well, returning to the full refresh reading function…

It will iterate over all stream slices and call for each one of them the read_records function.

This function is implemented in the HttpStream class (for most cases)

def read_records(
        self,
        sync_mode: SyncMode,
        cursor_field: List[str] = None,
        stream_slice: Mapping[str, Any] = None,
        stream_state: Mapping[str, Any] = None,
    ) -> Iterable[StreamData]:
        yield from self._read_pages(
            lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state), stream_slice, stream_state
        )

Stay with me! I know this one is not the most basic function :worried:

Trying to translate it’s generating an anonymous function which calls the parse_response

The parse_response is one of the function you must implement during the connector creation.

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
	res = response.json()
	yield from res.get("data", [])

And calls the _read_pages function which iterates over all pages from our API

def _read_pages(
        self,
        records_generator_fn: Callable[
            [requests.PreparedRequest, requests.Response, Mapping[str, Any], Mapping[str, Any]], Iterable[StreamData]
        ],
        stream_slice: Mapping[str, Any] = None,
        stream_state: Mapping[str, Any] = None,
    ) -> Iterable[StreamData]:
        stream_state = stream_state or {}
        pagination_complete = False
        next_page_token = None
        while not pagination_complete:
            request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token)
            yield from records_generator_fn(request, response, stream_state, stream_slice)

            next_page_token = self.next_page_token(response)
            if not next_page_token:
                pagination_complete = True

        # Always return an empty generator just in case no records were ever yielded
        yield from []
  1. Every new read with start with pagination and without the next page token
  2. The _fetch_next_page creates and sends the actual request to the API
    1. There are a few parameters we can override sometimes: body_json body_data headers and the request_parameter itself.
def _fetch_next_page(
        self, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
    ) -> Tuple[requests.PreparedRequest, requests.Response]:
        request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
        request = self._create_prepared_request(
            path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
            headers=dict(request_headers, **self.authenticator.get_auth_header()),
            params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
            json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
            data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
        )
        request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

        response = self._send_request(request, request_kwargs)
        return request, response
  1. The read_pages receives the response and pass to the parse_response function to yield the records.
  2. After will try to get the next page token. This is also a function you must implement.
    1. Returning the next_page_token dict it will send the data to next request and keep the pagination_complete = False
    2. If you return None it will stop the iteration and complete the reading process.
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
	body = json.loads(response.request.body)
  result = response.json()
  if body and result:
		  page_number = body.get("page_number")
      return {"page_number": page_number + 1, "page_size": 200}
	else:
      return None

Incremental Reading

The incremental reading is quite similar to the full refresh. The difference is when it starts will retrieve the previous stream_state and build the first HTTP request using this data.

And for each record read it will try to update the state which has a specific logic to get updated.

def _read_incremental(
        self,
        logger: logging.Logger,
        stream_instance: Stream,
        configured_stream: ConfiguredAirbyteStream,
        state_manager: ConnectorStateManager,
        internal_config: InternalConfig,
    ) -> Iterator[AirbyteMessage]:
        """Read stream using incremental algorithm

        :param logger:
        :param stream_instance:
        :param configured_stream:
        :param state_manager:
        :param internal_config:
        :return:
        """
        stream_name = configured_stream.stream.name
        stream_state = state_manager.get_stream_state(stream_name, stream_instance.namespace)

        if stream_state and "state" in dir(stream_instance):
            stream_instance.state = stream_state
            logger.info(f"Setting state of {stream_name} stream to {stream_state}")

        slices = stream_instance.stream_slices(
            cursor_field=configured_stream.cursor_field,
            sync_mode=SyncMode.incremental,
            stream_state=stream_state,
        )
        logger.debug(f"Processing stream slices for {stream_name} (sync_mode: incremental)", extra={"stream_slices": slices})

        total_records_counter = 0
        has_slices = False
        for _slice in slices:
            has_slices = True
            if self.should_log_slice_message(logger):
                yield AirbyteMessage(
                    type=MessageType.LOG,
                    log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"),
                )
            records = stream_instance.read_records(
                sync_mode=SyncMode.incremental,
                stream_slice=_slice,
                stream_state=stream_state,
                cursor_field=configured_stream.cursor_field or None,
            )
            record_counter = 0
            for message_counter, record_data_or_message in enumerate(records, start=1):
                message = self._get_message(record_data_or_message, stream_instance)
                yield message
                if message.type == MessageType.RECORD:
                    record = message.record
                    stream_state = stream_instance.get_updated_state(stream_state, record.data)
                    checkpoint_interval = stream_instance.state_checkpoint_interval
                    record_counter += 1
                    if checkpoint_interval and record_counter % checkpoint_interval == 0:
                        yield self._checkpoint_state(stream_instance, stream_state, state_manager)

                    total_records_counter += 1
                    # This functionality should ideally live outside of this method
                    # but since state is managed inside this method, we keep track
                    # of it here.
                    if self._limit_reached(internal_config, total_records_counter):
                        # Break from slice loop to save state and exit from _read_incremental function.
                        break

            yield self._checkpoint_state(stream_instance, stream_state, state_manager)
            if self._limit_reached(internal_config, total_records_counter):
                return

        if not has_slices:
            # Safety net to ensure we always emit at least one state message even if there are no slices
            checkpoint = self._checkpoint_state(stream_instance, stream_state, state_manager)
            yield checkpoint

We can check here we update the counter and try to checkpoint the state

stream_state = stream_instance.get_updated_state(stream_state, record.data)
checkpoint_interval = stream_instance.state_checkpoint_interval
record_counter += 1
if checkpoint_interval and record_counter % checkpoint_interval == 0:
		yield self._checkpoint_state(stream_instance, stream_state, state_manager)

:warning: The checkpoint ONLY will get update when the Source and the Destination emits the same STATE. For more information please check https://airbyte.com/blog/checkpointing

What customization you can add to your connector?

  • Request Headers
  • Request Parameters
  • Backoff Policy
  • Retry Strategy
def request_headers(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
    ) -> Mapping[str, Any]:
        return {"Accept": "application/json"}
def request_params(self, stream_slice: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
        params = self.base_params
        params["start"] = pendulum.parse(stream_slice["start"]).strftime(self.date_template)
        params["end"] = pendulum.parse(stream_slice["end"]).strftime(self.date_template)
        return params
@property
    def retry_factor(self) -> int:
        return 20
def should_retry(self, response: requests.Response) -> bool:
        # Gitlab API returns a 403 response in case a feature is disabled in a project (pipelines/jobs for instance).
        if response.status_code == 403:
            setattr(self, "raise_on_http_errors", False)
            self.logger.warning(
                f"Got 403 error when accessing URL {response.request.url}."
                f" Very likely the feature is disabled for this project and/or group. Please double check it, or report a bug otherwise."
            )
            return False
        return super().should_retry(response)
def backoff_time(self, response: requests.Response) -> Optional[float]:
        if self.is_raw_data_reports_reached_limit(response):
            now = pendulum.now("UTC")
            midnight = pendulum.tomorrow("UTC")
            wait_time = (midnight - now).seconds
        elif self.is_aggregate_reports_reached_limit(response):
            wait_time = 60
        else:
            return super().backoff_time(response)

        AirbyteLogger().log("INFO", f"Rate limit exceded. Retry in {wait_time} seconds.")
        return wait_time

@marcosmarxm Hi, I try to understand the def _read_pages
It contains

yield from records_generator_fn(request, response, stream_state, stream_slice)

I cannot find where this records_generator_fn resides or it is kind of special fucntion in Python. Can you help me to out?
And also, in CDK, and in the

def _send_request:
user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries)(self._send)

I dont get what and why we use (self._send) – is this a way to call function _send() ?

Hope to info from you