Startinng by the the Entrypoint.py
file in Airbyte CDK
def read(self, source_spec: ConnectorSpecification, config: TConfig, catalog: TCatalog, state: TState) -> Iterable[AirbyteMessage]:
self.set_up_secret_filter(config, source_spec.connectionSpecification)
if self.source.check_config_against_spec:
self.validate_connection(source_spec, config)
yield from self.source.read(self.logger, config, catalog, state)
- Similar to the
check
function first the read will mask any secrets - Validate if the current
config
is compatible with thespec
version of the connector - Calls the
read
function from theAbstractSource
class
The read
function in the AbstractSource
is quite big one.
def read(
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None,
) -> Iterator[AirbyteMessage]:
"""Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/."""
logger.info(f"Starting syncing {self.name}")
config, internal_config = split_config(config)
stream_instances = {s.name: s for s in self.streams(config)}
state_manager = ConnectorStateManager(stream_instance_map=stream_instances, state=state)
self._stream_to_instance_map = stream_instances
with create_timer(self.name) as timer:
for configured_stream in catalog.streams:
stream_instance = stream_instances.get(configured_stream.stream.name)
if not stream_instance:
raise KeyError(
f"The requested stream {configured_stream.stream.name} was not found in the source."
f" Available streams: {stream_instances.keys()}"
)
stream_is_available, error = stream_instance.check_availability(logger, self)
if not stream_is_available:
logger.warning(f"Skipped syncing stream '{stream_instance.name}' because it was unavailable. Error: {error}")
continue
try:
timer.start_event(f"Syncing stream {configured_stream.stream.name}")
logger.info(f"Marking stream {configured_stream.stream.name} as STARTED")
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.STARTED)
yield from self._read_stream(
logger=logger,
stream_instance=stream_instance,
configured_stream=configured_stream,
state_manager=state_manager,
internal_config=internal_config,
)
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.COMPLETE)
except AirbyteTracedException as e:
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.INCOMPLETE)
raise e
except Exception as e:
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.INCOMPLETE)
display_message = stream_instance.get_error_display_message(e)
if display_message:
raise AirbyteTracedException.from_exception(e, message=display_message) from e
raise e
finally:
timer.finish_event()
logger.info(f"Finished syncing {configured_stream.stream.name}")
logger.info(timer.report())
logger.info(f"Finished syncing {self.name}")
- Similar to the
discover
function it will call thestreams
function to retrieve all streams but now created a dictionary to easily map the stream class from their name - Start the
ConnectorStateManager
which will handle all state messages - Start a timer to count how much time each stream takes to finish
- For each
ConfiguredStream
(the selected stream by the user in the UI during the connection creation) - Check stream availability
- in most cases today it will use the
HttpAvailabilityStrategy
class which checks the first record
- in most cases today it will use the
- Output the status
STARTED
- Yield (print) the output from the iterator (list) from the function
_read_stream
- The function first check if the configured stream is
incremental
orfull_refresh
for each case will call a different function to read the records.
- The function first check if the configured stream is
def _read_stream(
self,
logger: logging.Logger,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
state_manager: ConnectorStateManager,
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
self._apply_log_level_to_stream_logger(logger, stream_instance)
if internal_config.page_size and isinstance(stream_instance, HttpStream):
logger.info(f"Setting page size for {stream_instance.name} to {internal_config.page_size}")
stream_instance.page_size = internal_config.page_size
logger.debug(
f"Syncing configured stream: {configured_stream.stream.name}",
extra={
"sync_mode": configured_stream.sync_mode,
"primary_key": configured_stream.primary_key,
"cursor_field": configured_stream.cursor_field,
},
)
logger.debug(
f"Syncing stream instance: {stream_instance.name}",
extra={
"primary_key": stream_instance.primary_key,
"cursor_field": stream_instance.cursor_field,
},
)
use_incremental = configured_stream.sync_mode == SyncMode.incremental and stream_instance.supports_incremental
if use_incremental:
record_iterator = self._read_incremental(
logger,
stream_instance,
configured_stream,
state_manager,
internal_config,
)
else:
record_iterator = self._read_full_refresh(logger, stream_instance, configured_stream, internal_config)
record_counter = 0
stream_name = configured_stream.stream.name
logger.info(f"Syncing stream: {stream_name} ")
for record in record_iterator:
if record.type == MessageType.RECORD:
record_counter += 1
if record_counter == 1:
logger.info(f"Marking stream {stream_name} as RUNNING")
# If we just read the first record of the stream, emit the transition to the RUNNING state
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.RUNNING)
yield record
logger.info(f"Read {record_counter} records from {stream_name} stream")
Full Refresh
Breaking down the full_refresh
method
record_iterator = self._read_full_refresh(
logger,
stream_instance,
configured_stream,
internal_config
)
def _read_full_refresh(
self,
logger: logging.Logger,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field)
logger.debug(
f"Processing stream slices for {configured_stream.stream.name} (sync_mode: full_refresh)", extra={"stream_slices": slices}
)
total_records_counter = 0
for _slice in slices:
if self.should_log_slice_message(logger):
yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"),
)
record_data_or_messages = stream_instance.read_records(
stream_slice=_slice,
sync_mode=SyncMode.full_refresh,
cursor_field=configured_stream.cursor_field,
)
for record_data_or_message in record_data_or_messages:
message = self._get_message(record_data_or_message, stream_instance)
yield message
if message.type == MessageType.RECORD:
total_records_counter += 1
if self._limit_reached(internal_config, total_records_counter):
return
The first step is to call the stream_slices
. This concept is somewhat complex and can mislead users. Let’s use one example to make it easier:
In our example our service has two endpoints Orders
and OrderDetails
The Orders
return only the list of orders without any details
"data" : [
{"order_id": 1, "path": "/order_details/1"},
{"order_id": 2, "path": "/order_details/2"},
{"order_id": 3, "path": "/order_details/3"}
]
And whatever we access the OrderDetails
with the order_details/2
"data": {
"order_id": 2,
"created_at": "2023-01-01",
"total_value": 1000
}
This can be translated as
class Orders(Stream):
use_cache = True
def path(self, stream_slice: Mapping[str, Any], **kwargs) -> str:
return "orders/"
class OrderDetails(Stream):
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
all_orders = Orders(config).read_records(SyncMode.full_refresh)
for record in all_orders:
yield {"order_id": record["order_id"]}
def path(self, stream_slice: Mapping[str, Any], **kwargs) -> str:
return f"order_details/{stream_slice['order_id']}"
Another situations where you can use stream_slices
are:
- You can create a spec parameter to iterate or filter the records you want to retrieve
- Let’s say the API has a path to each status
canceled
completed
pending
so you can create a parameter where the user has the option to retrieve records from each of them.
- Let’s say the API has a path to each status
- Sometimes the API allow you to request using date parameters and using the stream slice you can break the request into smaller chunks.
Well, returning to the full refresh reading function…
It will iterate over all stream slices and call for each one of them the read_records
function.
This function is implemented in the HttpStream
class (for most cases)
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[StreamData]:
yield from self._read_pages(
lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state), stream_slice, stream_state
)
Stay with me! I know this one is not the most basic function
Trying to translate it’s generating an anonymous function which calls the parse_response
The parse_response
is one of the function you must implement during the connector creation.
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
res = response.json()
yield from res.get("data", [])
And calls the _read_pages
function which iterates over all pages from our API
def _read_pages(
self,
records_generator_fn: Callable[
[requests.PreparedRequest, requests.Response, Mapping[str, Any], Mapping[str, Any]], Iterable[StreamData]
],
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[StreamData]:
stream_state = stream_state or {}
pagination_complete = False
next_page_token = None
while not pagination_complete:
request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token)
yield from records_generator_fn(request, response, stream_state, stream_slice)
next_page_token = self.next_page_token(response)
if not next_page_token:
pagination_complete = True
# Always return an empty generator just in case no records were ever yielded
yield from []
- Every new read with start with pagination and without the next page token
- The
_fetch_next_page
creates and sends the actual request to the API- There are a few parameters we can override sometimes:
body_json
body_data
headers
and therequest_parameter
itself.
- There are a few parameters we can override sometimes:
def _fetch_next_page(
self, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Tuple[requests.PreparedRequest, requests.Response]:
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
response = self._send_request(request, request_kwargs)
return request, response
- The
read_pages
receives the response and pass to theparse_response
function to yield the records. - After will try to get the next page token. This is also a function you must implement.
- Returning the
next_page_token
dict it will send the data to next request and keep thepagination_complete = False
- If you return None it will stop the iteration and complete the reading process.
- Returning the
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
body = json.loads(response.request.body)
result = response.json()
if body and result:
page_number = body.get("page_number")
return {"page_number": page_number + 1, "page_size": 200}
else:
return None
Incremental Reading
The incremental reading is quite similar to the full refresh. The difference is when it starts will retrieve the previous stream_state
and build the first HTTP request using this data.
And for each record read it will try to update the state which has a specific logic to get updated.
def _read_incremental(
self,
logger: logging.Logger,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
state_manager: ConnectorStateManager,
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
"""Read stream using incremental algorithm
:param logger:
:param stream_instance:
:param configured_stream:
:param state_manager:
:param internal_config:
:return:
"""
stream_name = configured_stream.stream.name
stream_state = state_manager.get_stream_state(stream_name, stream_instance.namespace)
if stream_state and "state" in dir(stream_instance):
stream_instance.state = stream_state
logger.info(f"Setting state of {stream_name} stream to {stream_state}")
slices = stream_instance.stream_slices(
cursor_field=configured_stream.cursor_field,
sync_mode=SyncMode.incremental,
stream_state=stream_state,
)
logger.debug(f"Processing stream slices for {stream_name} (sync_mode: incremental)", extra={"stream_slices": slices})
total_records_counter = 0
has_slices = False
for _slice in slices:
has_slices = True
if self.should_log_slice_message(logger):
yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"),
)
records = stream_instance.read_records(
sync_mode=SyncMode.incremental,
stream_slice=_slice,
stream_state=stream_state,
cursor_field=configured_stream.cursor_field or None,
)
record_counter = 0
for message_counter, record_data_or_message in enumerate(records, start=1):
message = self._get_message(record_data_or_message, stream_instance)
yield message
if message.type == MessageType.RECORD:
record = message.record
stream_state = stream_instance.get_updated_state(stream_state, record.data)
checkpoint_interval = stream_instance.state_checkpoint_interval
record_counter += 1
if checkpoint_interval and record_counter % checkpoint_interval == 0:
yield self._checkpoint_state(stream_instance, stream_state, state_manager)
total_records_counter += 1
# This functionality should ideally live outside of this method
# but since state is managed inside this method, we keep track
# of it here.
if self._limit_reached(internal_config, total_records_counter):
# Break from slice loop to save state and exit from _read_incremental function.
break
yield self._checkpoint_state(stream_instance, stream_state, state_manager)
if self._limit_reached(internal_config, total_records_counter):
return
if not has_slices:
# Safety net to ensure we always emit at least one state message even if there are no slices
checkpoint = self._checkpoint_state(stream_instance, stream_state, state_manager)
yield checkpoint
We can check here we update the counter and try to checkpoint the state
stream_state = stream_instance.get_updated_state(stream_state, record.data)
checkpoint_interval = stream_instance.state_checkpoint_interval
record_counter += 1
if checkpoint_interval and record_counter % checkpoint_interval == 0:
yield self._checkpoint_state(stream_instance, stream_state, state_manager)
The checkpoint ONLY will get update when the Source and the Destination emits the same STATE. For more information please check https://airbyte.com/blog/checkpointing
What customization you can add to your connector?
- Request Headers
- Request Parameters
- Backoff Policy
- Retry Strategy
def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
return {"Accept": "application/json"}
def request_params(self, stream_slice: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
params = self.base_params
params["start"] = pendulum.parse(stream_slice["start"]).strftime(self.date_template)
params["end"] = pendulum.parse(stream_slice["end"]).strftime(self.date_template)
return params
@property
def retry_factor(self) -> int:
return 20
def should_retry(self, response: requests.Response) -> bool:
# Gitlab API returns a 403 response in case a feature is disabled in a project (pipelines/jobs for instance).
if response.status_code == 403:
setattr(self, "raise_on_http_errors", False)
self.logger.warning(
f"Got 403 error when accessing URL {response.request.url}."
f" Very likely the feature is disabled for this project and/or group. Please double check it, or report a bug otherwise."
)
return False
return super().should_retry(response)
def backoff_time(self, response: requests.Response) -> Optional[float]:
if self.is_raw_data_reports_reached_limit(response):
now = pendulum.now("UTC")
midnight = pendulum.tomorrow("UTC")
wait_time = (midnight - now).seconds
elif self.is_aggregate_reports_reached_limit(response):
wait_time = 60
else:
return super().backoff_time(response)
AirbyteLogger().log("INFO", f"Rate limit exceded. Retry in {wait_time} seconds.")
return wait_time