I am trying to build a source-connector using the default HTTP API Python Client generator.
The API that I am integrating with it is not standard and I need to prefetch some data and using it after that for iterating all the ids one by one and making reuest for each of them (there is no API for getting more than one record).
The pre-fetched data is filtered by some condition and it might return empty list.
Sometimes this data can be simply empty list in this case I want to avoid making the first request to the given API. How can I achieve that I am giving some example to illustrate what I am doing? Any ideas?
class SourceFoo(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
resp = requests.get("FOO_URL", headers=headers)
if resp.status_code == 200:
return True, None
else:
return False, resp.text
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = TokenAuthenticator(token=config["token"], auth_method="Basic")
ok, ids = self.get_ids(config["token"])
return [
Items(ids=ids, authenticator=auth),
]
def get_ids(self, token) -> Tuple[bool, any]:
resp = requests.get("FOO_URL", ...)
if resp.status_code == 200:
ids = resp.json()["data"]
return True, [id for id in ids if condition]
else:
return False, resp.text
class Items(HttpStream):
url_base = "FOO_URL"
primary_key = "id"
def __init__(self, ids: list, **kwargs):
super().__init__(**kwargs)
if not(not ids):
self.id = ids.pop(0)
else:
self.id = None
self.ids = ids
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
if not self.ids or not self.id:
return None
else:
self.id = self.ids.pop(0)
# Can I get rid off this entity
return {"next": self.id}
def path(self, **kwargs) -> str:
return f"items/{self.id}"
def parse_response(self,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,) -> Iterable[Mapping]:
if response.status_code == 200:
return [response.json()]
elif response.status_code == 404:
return []
I could not figure out how to skip any call if my ids=[]
or my id=None
.