Hi, I’m working on developing a connector for the Aircall API (https://developer.aircall.io/api-references/#rest-apihttps://developer.aircall.io/api-references/#rest-api) using the CDK.
For the Users endpoint, there is additional information stored behind the id. I have a Users stream which works, so I have tried to add a UsersNumbers sub stream to take the ID from Users and get the nested information, but I’ve not been able to get it to work.
I’ve not used python much, especially not things like classes and methods so I’m having a hard time making sense of it. I know I need to implement a stream_slice method but beyond that I’m pretty lost. Any ideas what I’m missing or how it should look? Here is what I have at the moment for the different classes:
from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from datetime import datetime
import requests
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.streams import IncrementalMixin
# Basic full refresh stream
class AircallStream(HttpStream, ABC):
url_base = "https://api.aircall.io/v1/"
def __init__(self, config: Mapping[str, Any], **kwargs):
super().__init__()
self.access_key = config['access_key']
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
meta = response.json()['meta']
if meta['next_page_link']:
return {"page": meta['current_page'] + 1}
else:
return None
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = {'page': '1'}
if next_page_token:
params.update(next_page_token)
return params
def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return {"Authorization": self.access_key}
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
section = list(response.json().keys())[0]
jsout = response.json()[section]
yield from jsout
class Users(AircallStream):
@property
def use_cache(self) -> bool:
return True
primary_key = "id"
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return "users"
class UserNumbers(HttpSubStream, AircallStream):
parent: object = Users
primary_key = ""
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
user_id = stream_slice["id"]
return f"users/{user_id}"
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
users_stream = Users(**kwargs)
for user in users_stream.read_records(sync_mode=SyncMode.full_refresh):
return {"user_id": user["id"]}
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None
I get an error like this when I run it:
line 99, in stream_slices\n users_stream = Users(**kwargs)\nTypeError: AircallStream.__init__() missing 1 required positional argument: 'config'\n", "failure_type": "system_error"}}}
I think for this error I need to add an argument to my line users_stream = Users(**kwargs) for the config but I don’t know the way to reference the config from within this section of the code.
I changed the stream_slices method to this:
def stream_slices(self, config, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
users_stream = Users(config, **kwargs)
for user in users_stream.read_records(sync_mode=SyncMode.full_refresh):
return {"user_id": user["id"]}
but then it says:
TypeError: UserNumbers.stream_slices() missing 1 required positional argument: 'config'