Using an array instead of a string in the config

Hi everyone!

I’m currently working on a HTTP-connector for the weatherstack-API.
Currently, I can define a configuration with one city and a date. My connector then successfully delivers the weather data for that city & day.
What I want is to be able to use an array of cities in my configuration instead of one single city. My idea was to change the request_params such that instead of returning one dictionary, I return a list of dictionaries where each entry corresponds to the same request but for a different city.
However, returning a list of dictionaries instead of one dictionary in the request_params results in a Too many values to unpack-error. Does anyone have an idea on how I could solve this problem?

Hi @mercurymediamon,
For this use case, I would recommend using stream_slices by creating a slice per city and day you want to consume. We have documentation about stream slices here.
You will also need to change your spec.json to accept an array of cities instead of a string.
Could you please share a sample of your current code on which I could build an example?

Hi @alafanechere ,
Thanks for your quick reply. As for the spec.json, you’re totally right, I’ve done that already. As for the sample of my current code: Which class/methods would be of interest? I will now post the request_params and the parse_response method from my source.py and my config.json

def parse_response(
            self,
            response: requests.Response,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, Any] = None,
            next_page_token: Mapping[str, Any] = None,
    ) -> Iterable[Mapping]:
        #we only want the response text as a json, nothing else
        #return [response.json()] # NEEDS to be an array
        response = response.json()
        restructured_response = Transform.restructure_response(response, "historical")
        return [restructured_response]

def request_params(
            self,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, any] = None,
            next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        list_of_dicts = []
        for city in self.config.get("query"):
            x = {
                "access_key": self.config.get("access_key"),
                "query": city,
                "historical_date": self.config.get("historical", {}).get("date"),
                "hourly": 0
            }
            list_of_dicts.append(x)
        return list_of_dicts

config.json

{
    "access_key": "XXX",
    "query": [
        "Stuttgart",
        "Hamburg",
        "Duisburg"
    ],
    "historical": {
        "date": "2018-01-01",
        "hourly_interval": 24
    },
    "forecast": {
        "forecast_days": 1,
        "hourly_interval": 24
    }
}

Here’s my suggestion. The stream slices can be a bit different if you implement an incremental stream.

class YourStream(HttpStream):
    
    def __init__(self, authenticator, config):
        super().__init__(authenticator)
        self.cities = config.get("cities", [])

    def stream_slices(self):
        for city in self.cities:
            yield {"city": city}

    def request_params(
                self,
                stream_state: Mapping[str, Any],
                stream_slice: Mapping[str, any] = None,
                next_page_token: Mapping[str, Any] = None
        ) -> MutableMapping[str, Any]:
            list_of_dicts = []
            return {
                "access_key": self.config.get("access_key"),
                "query": stream_slice["city"],
                "historical_date": self.config.get("historical", {}).get("date"),
                "hourly": 0
            }

Thank you for your reply @alafanechere . Sorry for responding so late after you helped me, I was on vacation. I tried your suggestion and got this error:

{"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourceWeatherstack"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: forecast_daily "}}
{"type": "LOG", "log": {"level": "ERROR", "message": "Encountered an exception while reading stream SourceWeatherstack\nTraceback (most recent call last):\n  File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/sources/abstract_source.py\", line 103, in read\n    yield from self._read_stream(\n  File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/sources/abstract_source.py\", line 142, in _read_stream\n    for record in record_iterator:\n  File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/sources/abstract_source.py\", line 208, in _read_full_refresh\n    slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field)\nTypeError: stream_slices() got an unexpected keyword argument 'sync_mode'"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceWeatherstack"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceWeatherstack runtimes:\n"}}
{"type": "LOG", "log": {"level": "FATAL", "message": "stream_slices() got an unexpected keyword argument 'sync_mode'\nTraceback (most recent call last):\n  File \"main.py\", line 13, in <module>\n    launch(source, sys.argv[1:])\n  File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/entrypoint.py\", line 116, in launch\n    for message in source_entrypoint.run(parsed_args):\n  File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/entrypoint.py\", line 107, in run\n    for message in generator:\n  File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/sources/abstract_source.py\", line 112, in read\n    raise e\n  File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/sources/abstract_source.py\", line 103, in read\n    yield from self._read_stream(\n  File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/sources/abstract_source.py\", line 142, in _read_stream\n    for record in record_iterator:\n  File \"/Users/user/gitlab_projects/airbyte/airbyte-integrations/connectors/source-weatherstack/.venv/lib/python3.8/site-packages/airbyte_cdk/sources/abstract_source.py\", line 208, in _read_full_refresh\n    slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field)\nTypeError: stream_slices() got an unexpected keyword argument 'sync_mode'"}}

Just FYI I will post now the affected classes before & after my changes so you know what I did.
In the “afterwards”-code, I’ve added two stars (**) to each line of code that I touched after your suggestion.
Before:

class WeatherstackStream(HttpStream, ABC):
    url_base = URL_BASE

    #Weatherstack has no paging, so we don't need to return anything
    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        return None

    #Parameters we need for GET-methods
    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        return {}

    #Define what happens when data is returned
    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        yield {response.json()}

class Forecast(WeatherstackStream, ABC):

    primary_key = None

    def __init__(self, config: Mapping[str, Any], **kwargs):
        super().__init__(**kwargs)
        self.config = config

    #Path just tells the name of an endpoint
    def path(self, **kwargs) -> str:
        path = f"forecast"
        return path

    @abstractmethod
    def request_params(
            self,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, any] = None,
            next_page_token: Mapping[str, Any] = None):
        return None

    def parse_response(
            self,
            response: requests.Response,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, Any] = None,
            next_page_token: Mapping[str, Any] = None,
    ) -> Iterable[Mapping]:
        #we only want the response text as a json, nothing else
        #return [response.json()] # NEEDS to be an array
        response = response.json()
        restructured_response = Transform.restructure_response(response, "forecast")
        return [restructured_response]

class ForecastDaily(Forecast):

    def request_params(
            self,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, any] = None,
            next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        return {
            "access_key": self.config.get("access_key"),
            "query": self.config.get("query")[0],
            "forecast_days": self.config.get("forecast", {}).get("forecast_days"),
            "hourly": 0
        }

class ForecastHourly(Forecast):

    def request_params(
            self,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, any] = None,
            next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        return {
            "access_key": self.config.get("access_key"),
            "query": self.config.get("query")[0],
            "forecast_days": self.config.get("forecast", {}).get("forecast_days"),
            "hourly": 1,
            "interval": self.config.get("forecast", {}).get("hourly_interval")
        }

Afterwards:

# Basic full refresh stream
class WeatherstackStream(HttpStream, ABC):
    url_base = URL_BASE

    #Weatherstack has no paging, so we don't need to return anything
    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        return None

    #Parameters we need for GET-methods
    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        return {}

    #Define what happens when data is returned
    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        yield {response.json()}

class Forecast(WeatherstackStream, ABC):

    primary_key = None

    def __init__(self, config: Mapping[str, Any], **kwargs):
        super().__init__(**kwargs)
        self.config = config

    **def stream_slices(self):**
**        for city in self.config.get("cities", []):**
**            yield {"city": city}**

    #Path just tells the name of an endpoint
    def path(self, **kwargs) -> str:
        path = f"forecast"
        return path

    @abstractmethod
    def request_params(
            self,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, any] = None,
            next_page_token: Mapping[str, Any] = None):
        return None

    def parse_response(
            self,
            response: requests.Response,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, Any] = None,
            next_page_token: Mapping[str, Any] = None,
    ) -> Iterable[Mapping]:
        #we only want the response text as a json, nothing else
        #return [response.json()] # NEEDS to be an array
        response = response.json()
        restructured_response = Transform.restructure_response(response, "forecast")
        return [restructured_response]

class ForecastDaily(Forecast):

    def request_params(
            self,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, any] = None,
            next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        return {
            "access_key": self.config.get("access_key"),
            **"query": stream_slice["city"],**
            "forecast_days": self.config.get("forecast", {}).get("forecast_days"),
            "hourly": 0
        }

class ForecastHourly(Forecast):

    def request_params(
            self,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, any] = None,
            next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        return {
            "access_key": self.config.get("access_key"),
            **"query": stream_slice["city"],**
            "forecast_days": self.config.get("forecast", {}).get("forecast_days"),
            "hourly": 1,
            "interval": self.config.get("forecast", {}).get("hourly_interval")
        }

Hey @mercurymediamon**,**
I think the error you have is because you copied my example which had a little typo. The stream_slices method should take multiple args and kwargs.
Use the following signature please:

    def stream_slices( self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None ) -> Iterable[Optional[Mapping[str, Any]]]:

Hi @alafanechere
I copied your new code and replaced the stream_slices-signature with it.

This is what I am getting now:

line 60, in Forecast\n    def stream_slices( self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None ) -> Iterable[Optional[Mapping[str, Any]]]:\nNameError: name 'SyncMode' is not defined"}}

Do I have to import some library or where would I define the SyncMode?
I’m sorry if my issue sounds too trivial, this is my first connector and I’m not confident at implementing with Airbyte yet.

Yes you need to import these, but they are type hints only.

from typing import Any, Iterable, List, Mapping, Optionalfrom airbyte_cdk.models import SyncMode

Hi there from the Community Assistance team.
We’re letting you know about an issue we discovered with the back-end process we use to handle topics and responses on the forum. If you experienced a situation where you posted the last message in a topic that did not receive any further replies, please open a new topic to continue the discussion. In addition, if you’re having a problem and find a closed topic on the subject, go ahead and open a new topic on it and we’ll follow up with you. We apologize for the inconvenience, and appreciate your willingness to work with us to provide a supportive community.