Hi everybody,
I’m trying to build a source connector for the EOD Historical Data API. Especially, I want to fetch EOD stock prices daily. The endpoint requires a stock symbol and a exchange and returns the data in json format (Docu: https://eodhistoricaldata.com/financial-apis/api-for-historical-data-and-volumes/).
Likes this:
api/eod/{symbol}.{exchange}?api_token=demo&period=d&fmt=json
Now, I want to request several tickers and write all results into one postgres table. I created an EODContentStream (name: eod_content_stream) class for fetching the data for one combination of symbol and exchange. However, if a create a list of streams with various symbol/ exchange combinations, the read-method only takes one stream ( https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py#L102), because all streams have the same name. If I set a unique name per stream, I end up with n-tables in the postgres.
So, I’m searching for a best practice design pattern for having x-api calls (with different symbols) against the same endpoint and writing all results in one table. Have I overseen something?
Happy to discuss!
Thank you in advance!
Sidenote:
My current workaround is to overwrite the read_records-method of the EODContentStream and loop over pairs of symbol and exchanges. However, I do not think that this is the right approach.
See my workaround code below:
from airbyte_cdk.models import SyncMode
from abc import ABC
from typing import Mapping, Any, Iterable, Optional, Tuple, List
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
import requests
class EODContentStream(HttpStream, ABC):
primary_key = None
url_base = "https://eodhistoricaldata.com/api/"
def __init__(self, symbol_exchange_dict: dict, api_token: str = None, fmt: str = "json",
period: str = "d", order: str = "a", **kwargs):
self.symbol_exchange_dict = symbol_exchange_dict
self.api_token = api_token
self.fmt = fmt
self.period = period
self.order = order
super().__init__(**kwargs)
@property
def name(self) -> str:
return f"eod_content_stream"
def path(self, **kwargs) -> str:
path = f"eod/{kwargs.get('symbol')}.{kwargs.get('exchange')}?api_token={self.api_token}&fmt={self.fmt}&period={self.period}&order={self.order}"
return path
def parse_response(
self,
response: requests.Response,
**kwargs
) -> Iterable[Mapping]:
response = response.json()
for information in response:
information.update({"symbol": kwargs.get('symbol'), "exchange": kwargs.get('exchange')})
yield from response
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""This API does not return any information to support pagination"""
return {}
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
pagination_complete = False
next_page_token = None
while not pagination_complete:
for symbol, exchange in self.symbol_exchange_dict.items():
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
request = self._create_prepared_request(
path=self.path(symbol=symbol, exchange=exchange,stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
if self.use_cache:
# use context manager to handle and store cassette metadata
with self.cache_file as cass:
self.cassete = cass
# vcr tries to find records based on the request, if such records exist, return from cache file
# else make a request and save record in cache file
response = self._send_request(request, request_kwargs)
else:
response = self._send_request(request, request_kwargs)
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice, symbol=symbol, exchange=exchange)
next_page_token = self.next_page_token(response)
if not next_page_token:
pagination_complete = True
# Always return an empty generator just in case no records were ever yielded
yield from []
# Source
class SourceEodHistoricalData(AbstractSource):
def _get_symbol_exchange_dict(self) -> Mapping[str, str]:
return {"AAPL": "US", "NFLX": "US"}
def check_connection(self, logger, config) -> Tuple[bool, any]:
try:
symbol = "AAPL"
exchange = "US"
api_token = config.get("api_token")
content_stream = EODContentStream(symbol_exchange_dict={symbol: exchange}, api_token=api_token)
content_records = content_stream.read_records(sync_mode="full_refresh")
record = next(content_records)
logger.info(f"Successfully connected to EODContentStream stream. Pulled one record: {record}")
return True, None
except Exception as e:
return False, e
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
return [EODContentStream(symbol_exchange_dict=self._get_symbol_exchange_dict(), api_token=config.get("api_token"))]