Passing stream results into a secondary stream's API path

I have three streams.

accounts
|=statement list
|==statements

A field from accounts is used to populate the path for statement list. Two fields from statement list are needed to populate the path for statements.

I have the first two streams working the way I want by defining a class-specific stream slice.

class AccountRelatedStream(ParentClassStream, ABC):    
    
    def stream_slices(
        self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
    ) -> Iterable[Optional[Mapping[str, Any]]]:
        accounts_stream = Accounts(authenticator=self.authenticator)
        for record in accounts_stream.read_records(sync_mode=SyncMode.full_refresh):
            yield {"account_id": record["account"]["account_id"]}

I thought I could do something similar for the last stream, but this isn’t working.

class StatementRelatedStream(ParentClassStream, ABC):    
    
    def stream_slices(
        self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
    ) -> Iterable[Optional[Mapping[str, Any]]]:
        statements_stream = StatementsList(authenticator=self.authenticator)
        for record in statements_stream.read_records(sync_mode=SyncMode.full_refresh):
            yield {"account_id": record["account_id"],"statement_id":record["statement_id"]}

I get an TypeError: 'NoneType' object is not subscriptable error. Any ideas?

Hello there! You are receiving this message because none of your fellow community members has stepped in to respond to your topic post. (If you are a community member and you are reading this response, feel free to jump in if you have the answer!) As a result, the Community Assistance Team has been made aware of this topic and will be investigating and responding as quickly as possible.
Some important considerations that will help your to get your issue solved faster:

  • It is best to use our topic creation template; if you haven’t yet, we recommend posting a followup with the requested information. With that information the team will be able to more quickly search for similar issues with connectors and the platform and troubleshoot more quickly your specific question or problem.
  • Make sure to upload the complete log file; a common investigation roadblock is that sometimes the error for the issue happens well before the problem is surfaced to the user, and so having the tail of the log is less useful than having the whole log to scan through.
  • Be as descriptive and specific as possible; when investigating it is extremely valuable to know what steps were taken to encounter the issue, what version of connector / platform / Java / Python / docker / k8s was used, etc. The more context supplied, the quicker the investigation can start on your topic and the faster we can drive towards an answer.
  • We in the Community Assistance Team are glad you’ve made yourself part of our community, and we’ll do our best to answer your questions and resolve the problems as quickly as possible. Expect to hear from a specific team member as soon as possible.

Thank you for your time and attention.
Best,
The Community Assistance Team

Hey is this a custom connector? If so can you share the cdk version for it?

Hey @harshith – it is a custom connector. I’m on CDK version 0.1.56.

Got it. It will be helpful if you can share the repo URL so that we can take a look at the code. Is it possible?

Sure thing, here it is. source-canopy is the custom connector.

Hey could you also share the screenshot of the error? Or copy the complete error log here

Sure, here it is. The relevant code is in the repo in the tandym branch.

{"type": "LOG", "log": {"level": "ERROR", "message": "Encountered an exception while reading stream statements_detail\nTraceback (most recent call last):\n File \"/Users/dra/Code/Tandym Repos/airbyte-tandym/airbyte-integrations/connectors/source-canopy/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 113, in read\n yield from self._read_stream(\n File \"/Users/dra/Code/Tandym Repos/airbyte-tandym/airbyte-integrations/connectors/source-canopy/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 182, in _read_stream\n for record in record_iterator:\n File \"/Users/dra/Code/Tandym Repos/airbyte-tandym/airbyte-integrations/connectors/source-canopy/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 274, in _read_full_refresh\n for _slice in slices:\n File \"/Users/dra/Code/Tandym Repos/airbyte-tandym/airbyte-integrations/connectors/source-canopy/source_canopy/source.py\", line 98, in stream_slices\n for record in statements_stream.read_records(sync_mode=SyncMode.full_refresh):\n File \"/Users/dra/Code/Tandym Repos/airbyte-tandym/airbyte-integrations/connectors/source-canopy/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/streams/http/http.py\", line 410, in read_records\n path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),\n File \"/Users/dra/Code/Tandym Repos/airbyte-tandym/airbyte-integrations/connectors/source-canopy/source_canopy/source.py\", line 83, in path\n account_id = stream_slice[\"account_id\"]\nTypeError: 'NoneType' object is not subscriptable"}}

Got it. Do you mind printing stream_slice and check what data does that variable have?

Got this one figured out … the trick was to use a read_slices_from_records function to pass both variables I needed from stream-to-stream. This worked whereas the standard slice function did not … I’m guessing because the “source” stream was nested (for lack of a better word).

    def read_slices_from_records(self, stream_class: Type[ParentClassStream], slice_field: str, slice_field_two: str) -> Iterable[Optional[Mapping[str, Any]]]:
        stream = stream_class(authenticator=self.authenticator)
        stream_slices = stream.stream_slices(sync_mode=SyncMode.full_refresh)
        for stream_slice in stream_slices:
            for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice):
                yield {slice_field: record["account_id"], slice_field_two: record["statement_id"]}

You’ll need to create records for both parent streams (account and statement list). I strongly recommend to you enable the cache, so you won’t read the streams two times.
More info here: HTTP-API-based Connectors | Airbyte Documentation

class StatementRelatedStream(ParentClassStream, ABC):    
    
    def stream_slices(
        self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
    ) -> Iterable[Optional[Mapping[str, Any]]]:
        statements_stream = StatementsList(authenticator=self.authenticator)
        accounts = Accounts(authenticator=self.authenticator)
        for record in statements_stream.read_records(sync_mode=SyncMode.full_refresh):
            account_id = search_account_id(record['statement_id')
            yield {"account_id": account_id,"statement_id":record["statement_id"]}

Hope this helps you.