Hi, I have a source connector that is pulling data from the API in JSON and storing every JSON in a Postgres destination. Earlier I was pulling single data with the help of this spec. Here if you check there is a forms field that is changed to array now. So that I can pass different form_id and then it can pull different data for each form_id and create a new table for particular form_id
I have used stream slices to pull different form_id. But its giving me this error
ERROR i.a.w.g.DefaultReplicationWorker(run):188 - Sync worker failed.
java.util.concurrent.ExecutionException: io.airbyte.workers.general.DefaultReplicationWorker$SourceException: Source cannot be stopped!
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[?:?]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?]
at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:181) ~[io.airbyte-airbyte-workers-0.40.10.jar:?]
at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:68) ~[io.airbyte-airbyte-workers-0.40.10.jar:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:161) ~[io.airbyte-airbyte-workers-0.40.10.jar:?]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
Suppressed: io.airbyte.workers.exception.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.
Code
type: object
required:
- server_name
- username
- password
- forms
forms:
type: array
title: Form's ID
order: 3
Here’s what my code looks like
class Stream(HttpStream, ABC):
primary_key = None
def __init__(self, config: Mapping[str, Any], **kwargs):
super().__init__()
self.server_name = config['server_name']
self.forms = config.get("forms", [])
#base64 encode username and password as auth token
user_name_password = f"{config['username']}:{config['password']}"
self.auth_token = self._base64_encode(user_name_password)
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None
def _base64_encode(self,string:str) -> str:
return base64.b64encode(string.encode("ascii")).decode("ascii")
@property
def url_base(self) -> str:
return f"https://{self.server_name}.surveycto.com/api/v2/forms/data/wide/json/"
def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
for form in self.forms:
yield {"form": form}
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
list_of_dicts = []
return {
"query": stream_slice["form"]
}
def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
return {'Authorization': 'Basic ' + self.auth_token }
def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
response_json = response.json()
for data in response_json:
try:
yield data
except Exception as e:
msg = f"""Encountered an exception parsing schema"""
self.logger.exception(msg)
raise e
def path(
self,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None
) -> str:
return self.forms