Multiple API-Calls ending up in one table

Hi everybody,

I’m trying to build a source connector for the EOD Historical Data API. Especially, I want to fetch EOD stock prices daily. The endpoint requires a stock symbol and a exchange and returns the data in json format (Docu:

Likes this:

Now, I want to request several tickers and write all results into one postgres table. I created an EODContentStream (name: eod_content_stream) class for fetching the data for one combination of symbol and exchange. However, if a create a list of streams with various symbol/ exchange combinations, the read-method only takes one stream (, because all streams have the same name. If I set a unique name per stream, I end up with n-tables in the postgres.

So, I’m searching for a best practice design pattern for having x-api calls (with different symbols) against the same endpoint and writing all results in one table. Have I overseen something?

Happy to discuss!

Thank you in advance!


My current workaround is to overwrite the read_records-method of the EODContentStream and loop over pairs of symbol and exchanges. However, I do not think that this is the right approach.

See my workaround code below:

from airbyte_cdk.models import SyncMode

from abc import ABC
from typing import Mapping, Any, Iterable, Optional, Tuple, List

from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream

import requests

class EODContentStream(HttpStream, ABC):
    primary_key = None

    url_base = ""

    def __init__(self, symbol_exchange_dict: dict,  api_token: str = None, fmt: str = "json",
                 period: str = "d", order: str = "a", **kwargs):

        self.symbol_exchange_dict = symbol_exchange_dict
        self.api_token = api_token
        self.fmt = fmt
        self.period = period
        self.order = order

    def name(self) -> str:
        return f"eod_content_stream"

    def path(self, **kwargs) -> str:

        path = f"eod/{kwargs.get('symbol')}.{kwargs.get('exchange')}?api_token={self.api_token}&fmt={self.fmt}&period={self.period}&order={self.order}"

        return path

    def parse_response(
        response: requests.Response,
    ) -> Iterable[Mapping]:

        response = response.json()

        for information in response:
            information.update({"symbol": kwargs.get('symbol'), "exchange": kwargs.get('exchange')})

        yield from response

    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        """This API does not return any information to support pagination"""
        return {}

    def read_records(
        sync_mode: SyncMode,
        cursor_field: List[str] = None,
        stream_slice: Mapping[str, Any] = None,
        stream_state: Mapping[str, Any] = None,
    ) -> Iterable[Mapping[str, Any]]:
        stream_state = stream_state or {}
        pagination_complete = False

        next_page_token = None
        while not pagination_complete:
            for symbol, exchange in self.symbol_exchange_dict.items():
                request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
                request = self._create_prepared_request(
                    path=self.path(symbol=symbol, exchange=exchange,stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
                    headers=dict(request_headers, **self.authenticator.get_auth_header()),
                    params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
                    json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
                    data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
                request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

                if self.use_cache:
                    # use context manager to handle and store cassette metadata
                    with self.cache_file as cass:
                        self.cassete = cass
                        # vcr tries to find records based on the request, if such records exist, return from cache file
                        # else make a request and save record in cache file
                        response = self._send_request(request, request_kwargs)

                    response = self._send_request(request, request_kwargs)
                yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice, symbol=symbol, exchange=exchange)

                next_page_token = self.next_page_token(response)
                if not next_page_token:
                    pagination_complete = True

        # Always return an empty generator just in case no records were ever yielded
        yield from []

# Source
class SourceEodHistoricalData(AbstractSource):

    def _get_symbol_exchange_dict(self) -> Mapping[str, str]:
        return {"AAPL": "US", "NFLX": "US"}

    def check_connection(self, logger, config) -> Tuple[bool, any]:
            symbol = "AAPL"
            exchange = "US"
            api_token = config.get("api_token")

            content_stream = EODContentStream(symbol_exchange_dict={symbol: exchange}, api_token=api_token)

            content_records = content_stream.read_records(sync_mode="full_refresh")

            record = next(content_records)

  "Successfully connected to EODContentStream stream. Pulled one record: {record}")

            return True, None
        except Exception as e:
            return False, e

    def streams(self, config: Mapping[str, Any]) -> List[Stream]:

        return [EODContentStream(symbol_exchange_dict=self._get_symbol_exchange_dict(), api_token=config.get("api_token"))]

Hi @QuantyPython, this is a really interesting question! I’m looking into this, but if you have time - stop by the community office hours to get some more insight on this: