Implementing nested stream using full refresh parent

Hi, I’m working on developing a connector for the Aircall API (https://developer.aircall.io/api-references/#rest-apihttps://developer.aircall.io/api-references/#rest-api) using the CDK.

For the Users endpoint, there is additional information stored behind the id. I have a Users stream which works, so I have tried to add a UsersNumbers sub stream to take the ID from Users and get the nested information, but I’ve not been able to get it to work.

I’ve not used python much, especially not things like classes and methods so I’m having a hard time making sense of it. I know I need to implement a stream_slice method but beyond that I’m pretty lost. Any ideas what I’m missing or how it should look? Here is what I have at the moment for the different classes:


from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from datetime import datetime

import requests
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.streams import IncrementalMixin



# Basic full refresh stream
class AircallStream(HttpStream, ABC):

    url_base = "https://api.aircall.io/v1/"

    def __init__(self, config: Mapping[str, Any], **kwargs):
        super().__init__()
        self.access_key = config['access_key']

    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        
        meta = response.json()['meta']
        if meta['next_page_link']:
            return {"page": meta['current_page'] + 1}
        else:
            return None
    
    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        
        params = {'page': '1'}
        if next_page_token:
            params.update(next_page_token)
        
        return params

    def request_headers(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:

        return {"Authorization": self.access_key}
    

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
       
        section = list(response.json().keys())[0]
        jsout = response.json()[section]

        yield from jsout


class Users(AircallStream):
    
    @property
    def use_cache(self) -> bool:
        return True

    primary_key = "id"

    def path(
        self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
    ) -> str:

        return "users"

class UserNumbers(HttpSubStream, AircallStream):
    
    
    parent: object = Users
    primary_key = ""

    def path(
        self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
    ) -> str:
        
        user_id = stream_slice["id"]

        return f"users/{user_id}"
    

    def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
        
        users_stream = Users(**kwargs)
        for user in users_stream.read_records(sync_mode=SyncMode.full_refresh):
            return {"user_id": user["id"]}
    
    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        
        return None    

I get an error like this when I run it:

 line 99, in stream_slices\n    users_stream = Users(**kwargs)\nTypeError: AircallStream.__init__() missing 1 required positional argument: 'config'\n", "failure_type": "system_error"}}}

I think for this error I need to add an argument to my line users_stream = Users(**kwargs) for the config but I don’t know the way to reference the config from within this section of the code.

I changed the stream_slices method to this:

def stream_slices(self, config, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
        
        users_stream = Users(config, **kwargs)
        for user in users_stream.read_records(sync_mode=SyncMode.full_refresh):
            return {"user_id": user["id"]}

but then it says:
TypeError: UserNumbers.stream_slices() missing 1 required positional argument: 'config'

Hello there! You are receiving this message because none of your fellow community members has stepped in to respond to your topic post. (If you are a community member and you are reading this response, feel free to jump in if you have the answer!) As a result, the Community Assistance Team has been made aware of this topic and will be investigating and responding as quickly as possible.
Some important considerations that will help your to get your issue solved faster:

  • It is best to use our topic creation template; if you haven’t yet, we recommend posting a followup with the requested information. With that information the team will be able to more quickly search for similar issues with connectors and the platform and troubleshoot more quickly your specific question or problem.
  • Make sure to upload the complete log file; a common investigation roadblock is that sometimes the error for the issue happens well before the problem is surfaced to the user, and so having the tail of the log is less useful than having the whole log to scan through.
  • Be as descriptive and specific as possible; when investigating it is extremely valuable to know what steps were taken to encounter the issue, what version of connector / platform / Java / Python / docker / k8s was used, etc. The more context supplied, the quicker the investigation can start on your topic and the faster we can drive towards an answer.
  • We in the Community Assistance Team are glad you’ve made yourself part of our community, and we’ll do our best to answer your questions and resolve the problems as quickly as possible. Expect to hear from a specific team member as soon as possible.

Thank you for your time and attention.
Best,
The Community Assistance Team

Hi @shorgan, great to hear that you’re developing a new connector! I’ve found another post where the user was having difficulties with stream slices:
https://discuss.airbyte.io/t/stream-slices-strange-error/2824

The code suggested there is:

**def** **path**(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: **return** f"forms/{stream_slice['form_id']}" **def** **stream_slices**(self, stream_state: Mapping[str, Any] = None, **kwargs): **yield** **from** [{"form_id": id} **for** id **in** self.forms_id]

I know this isn’t a solution to your error, let me look into this more and debug though :slight_smile:

Hey @natalyjazzviolin , thank you for looking into this! Thanks for sharing that post, I applied the suggested format for stream slices output and tried some other things as well to try and solve the error. With this code for the UsersNumbers class I no longer get that error about positional arguments, however unfortunately i do now get an error that its not subscriptable:

class UserNumbers(HttpSubStream, AircallStream):
    
    
    parent: object = Users
    primary_key = "id"

    def path(
        self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
    ) -> str:
        
        user_id = stream_slice["id"]

        return f"users/{user_id}"
    

    def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
        
        users_stream = Users(self, **kwargs)
        yield from [{"user_id": user["id"]} for user in users_stream.read_records(sync_mode=SyncMode.full_refresh)]
    
    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        
        return None   

{"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: user_numbers "}}
{"type": "LOG", "log": {"level": "ERROR", "message": "Encountered an exception while reading stream user_numbers\nTraceback (most recent call last):\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 113, in read\n    yield from self._read_stream(\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 182, in _read_stream\n    for record in record_iterator:\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 274, in _read_full_refresh\n    for _slice in slices:\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/source_aircall/source.py\", line 99, in stream_slices\n    users_stream = Users(self, **kwargs)\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/source_aircall/source.py\", line 27, in __init__\n    self.access_key = config['access_key']\nTypeError: 'UserNumbers' object is not subscriptable"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing user_numbers"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceAircall runtimes:\nSyncing stream user_numbers 0:00:00.001375\nSyncing stream users 0:00:02.143316"}}
{"type": "LOG", "log": {"level": "FATAL", "message": "'UserNumbers' object is not subscriptable\nTraceback (most recent call last):\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/main.py\", line 13, in <module>\n    launch(source, sys.argv[1:])\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/.venv/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 123, in launch\n    for message in source_entrypoint.run(parsed_args):\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/.venv/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 114, in run\n    for message in generator:\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 127, in read\n    raise e\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 113, in read\n    yield from self._read_stream(\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 182, in _read_stream\n    for record in record_iterator:\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 274, in _read_full_refresh\n    for _slice in slices:\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/source_aircall/source.py\", line 99, in stream_slices\n    users_stream = Users(self, **kwargs)\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/source_aircall/source.py\", line 27, in __init__\n    self.access_key = config['access_key']\nTypeError: 'UserNumbers' object is not subscriptable"}}
{"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": 1667472551003.884, "error": {"message": "Something went wrong in the connector. See the logs for more details.", "internal_message": "'UserNumbers' object is not subscriptable", "stack_trace": "Traceback (most recent call last):\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/main.py\", line 13, in <module>\n    launch(source, sys.argv[1:])\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/.venv/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 123, in launch\n    for message in source_entrypoint.run(parsed_args):\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/.venv/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 114, in run\n    for message in generator:\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 127, in read\n    raise e\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 113, in read\n    yield from self._read_stream(\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 182, in _read_stream\n    for record in record_iterator:\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 274, in _read_full_refresh\n    for _slice in slices:\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/source_aircall/source.py\", line 99, in stream_slices\n    users_stream = Users(self, **kwargs)\n  File \"/Users/stuarthorgan/code/airbyte/airbyte-integrations/connectors/source-aircall/source_aircall/source.py\", line 27, in __init__\n    self.access_key = config['access_key']\nTypeError: 'UserNumbers' object is not subscriptable\n", "failure_type": "system_error"}}}