I am trying to build a connector, using LinkedIn API, which is an oauth2.0 authentication, connector build was successful, and added it to airbyte GUI, but while source setup test cases are failed and showed a 404 error, the given URL and credentials are correct
source.py file
from abc import ABC, abstractproperty
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from urllib.parse import urlencode
import requests
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import Oauth2Authenticator, TokenAuthenticator
Basic full refresh stream
class LinkedinStream(HttpStream, ABC):
# TODO: Fill in the url base. Required.
url_base = "https://api.linkedin.com/v2/"
primary_key = None
records_limit = 500
def __init__(self, config: Dict):
super().__init__(authenticator=config.get("authenticator"))
self.config = config
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
# The api requires that we include the data as a query param so we do that in this method.
return None
def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
# The response is a simple JSON whose schema matches our stream's schema exactly,
# so we just return a list containing the response.
return [response.json()]
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
# While the API does offer pagination, we will only ever retrieve one parameter with this implementation,
# so we just return None to indicate that there will never be any more pages in the response.
return None
class details(LinkedinStream):
def path(self, **kwargs) → str:
# This defines the path to the endpoint that we want to hit.
return f"/me"
Source
class SourceLinkedin(AbstractSource):
“”"
Abstract Source inheritance, provides:
- implementation for check
connector’s connectivity
- implementation to call each stream with it’s input parameters.
“”"
def get_authenticator(cls, config: Mapping[str, Any]) -> TokenAuthenticator:
"""
Validate input parameters and generate a necessary Authentication object
"""
auth_method = config.get("credentials", {}).get("auth_method")
if auth_method == "oAuth2.0":
return Oauth2Authenticator(
token_refresh_endpoint="https://www.linkedin.com/oauth/v2/accessToken",
client_id=config["credentials"]["client_id"],
client_secret=config["credentials"]["client_secret"],
refresh_token=config["credentials"]["refresh_token"],
)
raise Exception("incorrect input parameters")
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]:
config["authenticator"] = self.get_authenticator(config)
stream = details(config)
# need to load the first item only
stream.records_limit = 1
try:
next(stream.read_records(sync_mode=SyncMode.full_refresh), None)
return True, None
except Exception as e:
return False, e
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
Mapping an input config of the user input configuration as defined in the connector spec.
Passing config to the streams.
"""
config["authenticator"] = self.get_authenticator(config)
return [details(config),]