Stream.slices strange error

Hi, I have a source connector that is pulling data from the API in JSON and storing every JSON in a Postgres destination. Earlier I was pulling single data with the help of this spec. Here if you check there is a forms field that is changed to array now. So that I can pass different form_id and then it can pull different data for each form_id and create a new table for particular form_id

I have used stream slices to pull different form_id. But its giving me this error

ERROR i.a.w.g.DefaultReplicationWorker(run):188 - Sync worker failed.

java.util.concurrent.ExecutionException: io.airbyte.workers.general.DefaultReplicationWorker$SourceException: Source cannot be stopped!

at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[?:?]

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?]

at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:181) ~[io.airbyte-airbyte-workers-0.40.10.jar:?]

at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:68) ~[io.airbyte-airbyte-workers-0.40.10.jar:?]

at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:161) ~[io.airbyte-airbyte-workers-0.40.10.jar:?]

at java.lang.Thread.run(Thread.java:833) ~[?:?]

Suppressed: io.airbyte.workers.exception.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.

Code

  type: object
  required:
    - server_name
    - username
    - password
    - forms
    forms:
      type: array
      title: Form's ID
      order: 3

Here’s what my code looks like

class Stream(HttpStream, ABC):
    primary_key = None

    def __init__(self, config: Mapping[str, Any], **kwargs):
        super().__init__()
        self.server_name = config['server_name']
        self.forms = config.get("forms", [])
        #base64 encode username and password as auth token
        user_name_password = f"{config['username']}:{config['password']}"
        self.auth_token = self._base64_encode(user_name_password)
        
    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        return None

    def _base64_encode(self,string:str) -> str:
        return base64.b64encode(string.encode("ascii")).decode("ascii")

    @property
    def url_base(self) -> str:
        return f"https://{self.server_name}.surveycto.com/api/v2/forms/data/wide/json/"

    def stream_slices(
        self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
    ) -> Iterable[Optional[Mapping[str, Any]]]:
        for form in self.forms:
            yield {"form": form}
        

    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
         list_of_dicts = []
         return {
            "query": stream_slice["form"]
         }

    def request_headers(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
    ) -> Mapping[str, Any]:
        return {'Authorization': 'Basic ' + self.auth_token }

    def parse_response(
        self,
        response: requests.Response,
        stream_state: Mapping[str, Any],
        stream_slice: Mapping[str, Any] = None,
        next_page_token: Mapping[str, Any] = None,
    ) -> Iterable[Mapping]:
        response_json = response.json()

        for data in response_json:
            try:
                yield data
            except Exception as e:
                msg = f"""Encountered an exception parsing schema"""
                self.logger.exception(msg)
                raise e
 
    def path(
        self, 
        stream_state: Mapping[str, Any] = None, 
        stream_slice: Mapping[str, Any] = None, 
        next_page_token: Mapping[str, Any] = None
    ) -> str:
        return self.forms

You must do something like:

    def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
        return f"forms/{stream_slice['form_id']}"

    def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs):
            yield from [{"form_id": id} for id in self.forms_id]