How the Airbyte CDK check command works

The command and the implementation

Check validate that the provided configuration is valid with sufficient permissions for one to perform all. When implementing a connector you must implement the check_connection function to make it works.

The abstract function in the AbstractSource class.

@abstractmethod
    def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
        """
        :param logger: source logger
        :param config: The user-provided configuration as specified by the source's spec.
          This usually contains information required to check connection e.g. tokens, secrets and keys etc.
        :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
          and we can connect to the underlying data source using the provided configuration.
          Otherwise, the input config cannot be used to connect to the underlying data source,
          and the "error" object should describe what went wrong.
          The error object will be cast to string to display the problem to the user.
        """

Conventionally the Python CDK connectors the check_connection try to read a full refresh but only the first page from one stream. Sometimes the API has a specific endpoint to trigger and it can works fine too. Let’s take for example the Klaviyo connector, its reading data from the Metrics endpoint.

class SourceKlaviyo(AbstractSource):
    def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
        """Connection check to validate that the user-provided config can be used to connect to the underlying API
        :param config:  the user-input config object conforming to the connector's spec.json
        :param logger:  logger object
        :return Tuple[bool, Any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
        """
        try:
            # we use metrics endpoint because it never returns an error
            _ = list(Metrics(api_key=config["api_key"]).read_records(sync_mode=SyncMode.full_refresh))
        except Exception as e:
            return False, repr(e)
        return True, None

Sometimes your connector has parameters in the configuration you must validate during the check. It’s the case for the Github connector:

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
        try:
            authenticator = self._get_authenticator(config)
            _, repositories = self._get_org_repositories(config=config, authenticator=authenticator)
            if not repositories:
                return False, "Invalid repositories. Valid examples: airbytehq/airbyte airbytehq/another-repo airbytehq/* airbytehq/airbyte"
            return True, None

        except Exception as e:
            message = repr(e)
            user_message = self.user_friendly_error_message(message)
            return False, user_message or message

In most situations your implementation falls in the first example.

How it got invocaked

Starting in the AirbyteEntrypoint

located at airbyte/airbyte-cdk/python/airbyte_cdk/entrypoint.py

def check(self, source_spec: ConnectorSpecification, config: TConfig) -> Iterable[AirbyteMessage]:
    self.set_up_secret_filter(config, source_spec.connectionSpecification)
    try:
        self.validate_connection(source_spec, config)
    except AirbyteTracedException as traced_exc:
        connection_status = traced_exc.as_connection_status_message()
        if connection_status:
            yield connection_status
            return

    check_result = self.source.check(self.logger, config)
    if check_result.status == Status.SUCCEEDED:
        self.logger.info("Check succeeded")
    else:
        self.logger.error("Check failed")
		yield AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result)
  1. This functions helps to mask any secret which accidentally can be exposed in logs
  2. Compare if the config.json file is compatible with the current version of the spec.json
  3. Call the check function from AbstractSource class
    1. The function is a safe caller of the check_connection which you had implemented.
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
        """Implements the Check Connection operation from the Airbyte Specification.
        See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
        """
        check_succeeded, error = self.check_connection(logger, config)
        if not check_succeeded:
            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
        return AirbyteConnectionStatus(status=Status.SUCCEEDED)
  1. Output the AirbyteConnectionStatus message

Low-code way

From the Source Alpha Vantage connector in the manifest.yaml

check:
  stream_names:
    - "time_series_weekly"
    - "time_series_weekly_adjusted"

Which there is a automatic way to check the streams:

def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
    streams = source.streams(config)
    stream_name_to_stream = {s.name: s for s in streams}
    if len(streams) == 0:
        return False, f"No streams to connect to from source {source}"
    for stream_name in self.stream_names:
        if stream_name not in stream_name_to_stream.keys():
            raise ValueError(f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}.")

        stream = stream_name_to_stream[stream_name]
        availability_strategy = stream.availability_strategy or HttpAvailabilityStrategy()
        try:
            stream_is_available, reason = availability_strategy.check_availability(stream, logger, source)
            if stream_is_available:
                return True, None
            else:
                return False, reason
        except Exception as error:
            logger.error(f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}")
            return False, f"Unable to connect to stream {stream_name} - {error}"