I have a custom connector as a source and with that source, we are trying to pull the data from the API and push that to Postgres which is our destination. So because we are just pushing the JSON raw into DB we want to do basic normalization. But we need to make it dynamic so that we are doing as soon as we get the data we create a schema out of it. and then we try to update the get_json_schema function. But the problem I’m facing is if at the start we make a connection as empty properties later we cannot change that. So how do I update the get_json_schema to take my schema instead of the empty one after making the connection or during the sync it can update it.
Now if you look at the get_json_schema function we are waiting for the data to come in response.json then we pull the schema out of it and now what we want when the normalization runs it needs to update the dump_schema empty onto the schema it just got.
Can we control this?
class SurveyStream(HttpStream, ABC):
def __init__(self, config: Mapping[str, Any], form_id, **kwargs):
super().__init__()
self.server_name = config['server_name']
self.form_id = form_id
self.start_date = config['start_date']
#base64 encode username and password as auth token
user_name_password = f"{config['username']}:{config['password']}"
self.auth_token = self._base64_encode(user_name_password)
@property
def url_base(self) -> str:
return f"https://{self.server_name}.surveycto.com/api/v2/forms/data/wide/json/"
def _base64_encode(self,string:str) -> str:
return base64.b64encode(string.encode("ascii")).decode("ascii")
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return {}
class SurveyctoStream(SurveyStream, IncrementalMixin):
primary_key = 'KEY'
date_format = '%b %d, %Y %H:%M:%S %p'
dateformat = '%Y-%m-%dT%H:%M:%S'
cursor_field = 'CompletionDate'
_cursor_value = None
@property
def name(self) -> str:
return self.form_id
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None
def get_json_schema(self):
if hasattr(self, 'response_json'):
generator = SchemaGenerator(input_format='dict', infer_mode='NULLABLE',preserve_input_sort_order='true')
data = self.response_json
schema_map, error_logs = generator.deduce_schema(input_data=data)
schema = generator.flatten_schema(schema_map)
schema_json = converter(schema)
dump_schema=schema_json['definitions']['element']['properties']
else:
dump_schema = {}
return {
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": dump_schema,
}
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return self.form_id
@property
def state(self) -> Mapping[str, Any]:
initial_date = datetime.strptime(self.start_date, self.date_format)
if self._cursor_value:
return {self.cursor_field: self._cursor_value}
else:
return {self.cursor_field: initial_date}
@state.setter
def state(self, value: Mapping[str, Any]):
self._cursor_value = datetime.strptime(value[self.cursor_field], self.dateformat)
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
ix = self.state[self.cursor_field]
return {'date': ix.strftime(self.date_format)}
def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
return {'Authorization': 'Basic ' + self.auth_token }
def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
self.response_json = response.json()
for data in self.response_json:
try:
yield data
except Exception as e:
msg = f"""Encountered an exception parsing schema"""
self.logger.exception(msg)
raise e
def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
for record in super().read_records(*args, **kwargs):
self._cursor_value = datetime.strptime(record[self.cursor_field], self.date_format)
yield record
# Source
class SourceSurveycto(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
return True, None
def no_auth(self):
return NoAuth()
def generate_streams(self, config: str) -> List[Stream]:
forms = config.get("form_id", [])
for form_id in forms:
yield SurveyctoStream(
config=config,
form_id=form_id
)
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
# auth = TokenAuthenticator(token="api_key") # Oauth2Authenticator is also available if you need oauth support
# return [Customers(authenticator=auth), Employees(authenticator=auth)]
# auth = NoAuth()
streams = self.generate_streams(config=config)
return streams