Thank you for your reply @alafanechere . Sorry for responding so late after you helped me, I was on vacation. I tried your suggestion and got this error:
{"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourceWeatherstack"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: forecast_daily "}}
{"type": "LOG", "log": {"level": "ERROR", "message": "Encountered an exception while reading stream SourceWeatherstack\nTraceback (most recent call last):\n File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/sources/abstract_source.py\", line 103, in read\n yield from self._read_stream(\n File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/sources/abstract_source.py\", line 142, in _read_stream\n for record in record_iterator:\n File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/sources/abstract_source.py\", line 208, in _read_full_refresh\n slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field)\nTypeError: stream_slices() got an unexpected keyword argument 'sync_mode'"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceWeatherstack"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceWeatherstack runtimes:\n"}}
{"type": "LOG", "log": {"level": "FATAL", "message": "stream_slices() got an unexpected keyword argument 'sync_mode'\nTraceback (most recent call last):\n File \"main.py\", line 13, in <module>\n launch(source, sys.argv[1:])\n File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/entrypoint.py\", line 116, in launch\n for message in source_entrypoint.run(parsed_args):\n File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/entrypoint.py\", line 107, in run\n for message in generator:\n File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/sources/abstract_source.py\", line 112, in read\n raise e\n File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/sources/abstract_source.py\", line 103, in read\n yield from self._read_stream(\n File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/sources/abstract_source.py\", line 142, in _read_stream\n for record in record_iterator:\n File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/sources/abstract_source.py\", line 208, in _read_full_refresh\n slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field)\nTypeError: stream_slices() got an unexpected keyword argument 'sync_mode'"}}
Just FYI I will post now the affected classes before & after my changes so you know what I did.
In the “afterwards”-code, I’ve added two stars (**) to each line of code that I touched after your suggestion.
Before:
class WeatherstackStream(HttpStream, ABC):
url_base = URL_BASE
#Weatherstack has no paging, so we don't need to return anything
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None
#Parameters we need for GET-methods
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return {}
#Define what happens when data is returned
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
yield {response.json()}
class Forecast(WeatherstackStream, ABC):
primary_key = None
def __init__(self, config: Mapping[str, Any], **kwargs):
super().__init__(**kwargs)
self.config = config
#Path just tells the name of an endpoint
def path(self, **kwargs) -> str:
path = f"forecast"
return path
@abstractmethod
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, any] = None,
next_page_token: Mapping[str, Any] = None):
return None
def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
#we only want the response text as a json, nothing else
#return [response.json()] # NEEDS to be an array
response = response.json()
restructured_response = Transform.restructure_response(response, "forecast")
return [restructured_response]
class ForecastDaily(Forecast):
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, any] = None,
next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return {
"access_key": self.config.get("access_key"),
"query": self.config.get("query")[0],
"forecast_days": self.config.get("forecast", {}).get("forecast_days"),
"hourly": 0
}
class ForecastHourly(Forecast):
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, any] = None,
next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return {
"access_key": self.config.get("access_key"),
"query": self.config.get("query")[0],
"forecast_days": self.config.get("forecast", {}).get("forecast_days"),
"hourly": 1,
"interval": self.config.get("forecast", {}).get("hourly_interval")
}
Afterwards:
# Basic full refresh stream
class WeatherstackStream(HttpStream, ABC):
url_base = URL_BASE
#Weatherstack has no paging, so we don't need to return anything
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None
#Parameters we need for GET-methods
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return {}
#Define what happens when data is returned
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
yield {response.json()}
class Forecast(WeatherstackStream, ABC):
primary_key = None
def __init__(self, config: Mapping[str, Any], **kwargs):
super().__init__(**kwargs)
self.config = config
**def stream_slices(self):**
** for city in self.config.get("cities", []):**
** yield {"city": city}**
#Path just tells the name of an endpoint
def path(self, **kwargs) -> str:
path = f"forecast"
return path
@abstractmethod
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, any] = None,
next_page_token: Mapping[str, Any] = None):
return None
def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
#we only want the response text as a json, nothing else
#return [response.json()] # NEEDS to be an array
response = response.json()
restructured_response = Transform.restructure_response(response, "forecast")
return [restructured_response]
class ForecastDaily(Forecast):
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, any] = None,
next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return {
"access_key": self.config.get("access_key"),
**"query": stream_slice["city"],**
"forecast_days": self.config.get("forecast", {}).get("forecast_days"),
"hourly": 0
}
class ForecastHourly(Forecast):
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, any] = None,
next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return {
"access_key": self.config.get("access_key"),
**"query": stream_slice["city"],**
"forecast_days": self.config.get("forecast", {}).get("forecast_days"),
"hourly": 1,
"interval": self.config.get("forecast", {}).get("hourly_interval")
}