The command and the implementation
Check
validate that the provided configuration is valid with sufficient permissions for one to perform all. When implementing a connector you must implement the check_connection
function to make it works.
The abstract function in the AbstractSource
class.
@abstractmethod
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
"""
:param logger: source logger
:param config: The user-provided configuration as specified by the source's spec.
This usually contains information required to check connection e.g. tokens, secrets and keys etc.
:return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
and we can connect to the underlying data source using the provided configuration.
Otherwise, the input config cannot be used to connect to the underlying data source,
and the "error" object should describe what went wrong.
The error object will be cast to string to display the problem to the user.
"""
Conventionally the Python CDK connectors the check_connection
try to read a full refresh but only the first page from one stream. Sometimes the API has a specific endpoint to trigger and it can works fine too. Let’s take for example the Klaviyo connector, its reading data from the Metrics
endpoint.
class SourceKlaviyo(AbstractSource):
def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
"""Connection check to validate that the user-provided config can be used to connect to the underlying API
:param config: the user-input config object conforming to the connector's spec.json
:param logger: logger object
:return Tuple[bool, Any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
"""
try:
# we use metrics endpoint because it never returns an error
_ = list(Metrics(api_key=config["api_key"]).read_records(sync_mode=SyncMode.full_refresh))
except Exception as e:
return False, repr(e)
return True, None
Sometimes your connector has parameters in the configuration you must validate during the check. It’s the case for the Github connector:
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
try:
authenticator = self._get_authenticator(config)
_, repositories = self._get_org_repositories(config=config, authenticator=authenticator)
if not repositories:
return False, "Invalid repositories. Valid examples: airbytehq/airbyte airbytehq/another-repo airbytehq/* airbytehq/airbyte"
return True, None
except Exception as e:
message = repr(e)
user_message = self.user_friendly_error_message(message)
return False, user_message or message
In most situations your implementation falls in the first example.
How it got invocaked
Starting in the AirbyteEntrypoint
located at airbyte/airbyte-cdk/python/airbyte_cdk/entrypoint.py
def check(self, source_spec: ConnectorSpecification, config: TConfig) -> Iterable[AirbyteMessage]:
self.set_up_secret_filter(config, source_spec.connectionSpecification)
try:
self.validate_connection(source_spec, config)
except AirbyteTracedException as traced_exc:
connection_status = traced_exc.as_connection_status_message()
if connection_status:
yield connection_status
return
check_result = self.source.check(self.logger, config)
if check_result.status == Status.SUCCEEDED:
self.logger.info("Check succeeded")
else:
self.logger.error("Check failed")
yield AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result)
- This functions helps to mask any secret which accidentally can be exposed in logs
- Compare if the
config.json
file is compatible with the current version of thespec.json
- Call the
check
function fromAbstractSource
class- The function is a safe caller of the
check_connection
which you had implemented.
- The function is a safe caller of the
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""Implements the Check Connection operation from the Airbyte Specification.
See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
"""
check_succeeded, error = self.check_connection(logger, config)
if not check_succeeded:
return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
- Output the
AirbyteConnectionStatus
message
Low-code way
From the Source Alpha Vantage
connector in the manifest.yaml
check:
stream_names:
- "time_series_weekly"
- "time_series_weekly_adjusted"
Which there is a automatic way to check the streams:
def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
streams = source.streams(config)
stream_name_to_stream = {s.name: s for s in streams}
if len(streams) == 0:
return False, f"No streams to connect to from source {source}"
for stream_name in self.stream_names:
if stream_name not in stream_name_to_stream.keys():
raise ValueError(f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}.")
stream = stream_name_to_stream[stream_name]
availability_strategy = stream.availability_strategy or HttpAvailabilityStrategy()
try:
stream_is_available, reason = availability_strategy.check_availability(stream, logger, source)
if stream_is_available:
return True, None
else:
return False, reason
except Exception as error:
logger.error(f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}")
return False, f"Unable to connect to stream {stream_name} - {error}"