How the Airbyte CDK discover command works

Discover - Discover the Source’s schema. This let users select what a subset of the data to sync. Useful if users require only a subset of the data.

The discover will calls the streams function which you need to implement in your connector and return the AirbyteCatalog to the user select what streams they want and generate the AirbyteConfiguredCatalog.

def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
		"""Implements the Discover operation from the Airbyte Specification.
    See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.
    """
    streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
    return AirbyteCatalog(streams=streams)

In most cases the streams function implementation looks. The example uses the Linnworks connector. It only instantiate the streams and returning a list.

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        auth = self._auth(config)
        return [
            StockLocations(authenticator=auth),
            StockLocationDetails(authenticator=auth),
            StockItems(authenticator=auth),
            ProcessedOrders(authenticator=auth, start_date=config["start_date"]),
            ProcessedOrderDetails(authenticator=auth, start_date=config["start_date"]),
        ] 

More complex cases will use data from the spec and add or remove streams. Some cases where this can happens is:

  • Connector allow custom queries (like Google Ads and Analytics, Facebook Marketing)
  • Connector has dynamic streams (example is Webflow)
  • Connector has 2 ways to auth and one type doesn’t have permission to access particular stream you must remove them here

Example from Hubspot connector. Where if the spec configured is using oauth it has much more access compared to the api key method.

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        credentials = config.get("credentials", {})
        common_params = self.get_common_params(config=config)
        streams = [
            Campaigns(**common_params),
            Companies(**common_params),
            ContactLists(**common_params),
            Goals(**common_params),
            TicketPipelines(**common_params),
            Workflows(**common_params),
        ]

        api = API(credentials=credentials)
        if api.is_oauth2():
            authenticator = api.get_authenticator()
            granted_scopes = self.get_granted_scopes(authenticator)
            self.logger.info(f"The following scopes were granted: {granted_scopes}")

            available_streams = [stream for stream in streams if stream.scope_is_granted(granted_scopes)]
            unavailable_streams = [stream for stream in streams if not stream.scope_is_granted(granted_scopes)]
            self.logger.info(f"The following streams are unavailable: {[s.name for s in unavailable_streams]}")
            partially_available_streams = [stream for stream in streams if not stream.properties_scope_is_granted()]
            required_scoped = set(chain(*[x.properties_scopes for x in partially_available_streams]))
            self.logger.info(
                f"The following streams are partially available: {[s.name for s in partially_available_streams]}, "
                f"add the following scopes to download all available data: {required_scoped}"
            )
        else:
            self.logger.info("No scopes to grant when authenticating with API key.")
            available_streams = streams

        available_streams.extend(self.get_custom_object_streams(api=api, common_params=common_params))

        return available_streams