Stream.slices strange error

Hi, I have a source connector that is pulling data from the API in JSON and storing every JSON in a Postgres destination. Earlier I was pulling single data with the help of this spec. Here if you check there is a forms field that is changed to array now. So that I can pass different form_id and then it can pull different data for each form_id and create a new table for particular form_id

I have used stream slices to pull different form_id. But its giving me this error

ERROR i.a.w.g.DefaultReplicationWorker(run):188 - Sync worker failed.

java.util.concurrent.ExecutionException: io.airbyte.workers.general.DefaultReplicationWorker$SourceException: Source cannot be stopped!

at java.util.concurrent.CompletableFuture.reportGet( ~[?:?]

at java.util.concurrent.CompletableFuture.get( ~[?:?]

at ~[io.airbyte-airbyte-workers-0.40.10.jar:?]

at ~[io.airbyte-airbyte-workers-0.40.10.jar:?]

at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2( ~[io.airbyte-airbyte-workers-0.40.10.jar:?]

at ~[?:?]

Suppressed: io.airbyte.workers.exception.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.


  type: object
    - server_name
    - username
    - password
    - forms
      type: array
      title: Form's ID
      order: 3

Here’s what my code looks like

class Stream(HttpStream, ABC):
    primary_key = None

    def __init__(self, config: Mapping[str, Any], **kwargs):
        self.server_name = config['server_name']
        self.forms = config.get("forms", [])
        #base64 encode username and password as auth token
        user_name_password = f"{config['username']}:{config['password']}"
        self.auth_token = self._base64_encode(user_name_password)
    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        return None

    def _base64_encode(self,string:str) -> str:
        return base64.b64encode(string.encode("ascii")).decode("ascii")

    def url_base(self) -> str:
        return f"https://{self.server_name}"

    def stream_slices(
        self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
    ) -> Iterable[Optional[Mapping[str, Any]]]:
        for form in self.forms:
            yield {"form": form}

    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
         list_of_dicts = []
         return {
            "query": stream_slice["form"]

    def request_headers(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
    ) -> Mapping[str, Any]:
        return {'Authorization': 'Basic ' + self.auth_token }

    def parse_response(
        response: requests.Response,
        stream_state: Mapping[str, Any],
        stream_slice: Mapping[str, Any] = None,
        next_page_token: Mapping[str, Any] = None,
    ) -> Iterable[Mapping]:
        response_json = response.json()

        for data in response_json:
                yield data
            except Exception as e:
                msg = f"""Encountered an exception parsing schema"""
                raise e
    def path(
        stream_state: Mapping[str, Any] = None, 
        stream_slice: Mapping[str, Any] = None, 
        next_page_token: Mapping[str, Any] = None
    ) -> str:
        return self.forms

You must do something like:

    def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
        return f"forms/{stream_slice['form_id']}"

    def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs):
            yield from [{"form_id": id} for id in self.forms_id]