I have a custom connector as a source and with that source, we are trying to pull the data from the API and push that to Postgres which is our destination. So because we are just pushing the JSON raw into DB we want to do basic normalization. But we need to make it dynamic so that we are doing as soon as we get the data we create a schema out of it. and then we try to update the get_json_schema function. But the problem I’m facing is if at the start we make a connection as empty properties later we cannot change that. So how do I update the get_json_schema to take my schema instead of the empty one after making the connection or during the sync it can update it.
Now if you look at the get_json_schema function we are waiting for the data to come in response.json then we pull the schema out of it and now what we want when the normalization runs it needs to update the dump_schema empty onto the schema it just got.
Can we control this?
class SurveyStream(HttpStream, ABC):
    def __init__(self, config: Mapping[str, Any], form_id, **kwargs):
        super().__init__()
        self.server_name = config['server_name']
        self.form_id = form_id
        self.start_date = config['start_date']
        #base64 encode username and password as auth token
        user_name_password = f"{config['username']}:{config['password']}"
        self.auth_token = self._base64_encode(user_name_password)
    @property
    def url_base(self) -> str:
         return f"https://{self.server_name}.surveycto.com/api/v2/forms/data/wide/json/"
    def _base64_encode(self,string:str) -> str:
        return base64.b64encode(string.encode("ascii")).decode("ascii")
    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        
        return {}
class SurveyctoStream(SurveyStream, IncrementalMixin):
    primary_key = 'KEY'
    date_format = '%b %d, %Y %H:%M:%S %p'
    dateformat =  '%Y-%m-%dT%H:%M:%S'
    cursor_field = 'CompletionDate'
    _cursor_value = None
    @property
    def name(self) -> str:
        return self.form_id
    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        return None
    def get_json_schema(self):
        if hasattr(self, 'response_json'):
            generator = SchemaGenerator(input_format='dict', infer_mode='NULLABLE',preserve_input_sort_order='true')
            data = self.response_json
            schema_map, error_logs = generator.deduce_schema(input_data=data)
            schema = generator.flatten_schema(schema_map)     
            schema_json = converter(schema)
            dump_schema=schema_json['definitions']['element']['properties']
        else:
            dump_schema = {} 
       
        return {
            "$schema": "http://json-schema.org/draft-07/schema#",
            "additionalProperties": True,
            "type": "object",
            "properties": dump_schema,
        }
       
    def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
         return self.form_id
    @property
    def state(self) -> Mapping[str, Any]:
        initial_date = datetime.strptime(self.start_date, self.date_format)
        if self._cursor_value:
            return {self.cursor_field: self._cursor_value}
        else:
            return {self.cursor_field: initial_date}
    @state.setter
    def state(self, value: Mapping[str, Any]):
        self._cursor_value = datetime.strptime(value[self.cursor_field], self.dateformat)
    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
         ix = self.state[self.cursor_field] 
         return {'date': ix.strftime(self.date_format)}
    def request_headers(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
    ) -> Mapping[str, Any]:
        return {'Authorization': 'Basic ' + self.auth_token }
    def parse_response(
        self,
        response: requests.Response,
        stream_state: Mapping[str, Any],
        stream_slice: Mapping[str, Any] = None,
        next_page_token: Mapping[str, Any] = None,
    ) -> Iterable[Mapping]:
        self.response_json = response.json()
    
        for data in self.response_json:
            try:
                yield data
            except Exception as e:
                msg = f"""Encountered an exception parsing schema"""
                self.logger.exception(msg)
                raise e
    def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
        for record in super().read_records(*args, **kwargs):
            self._cursor_value = datetime.strptime(record[self.cursor_field], self.date_format)
            yield record
# Source
class SourceSurveycto(AbstractSource):
    def check_connection(self, logger, config) -> Tuple[bool, any]:
        return True, None
    def no_auth(self):
        return NoAuth()     
    def generate_streams(self, config: str) -> List[Stream]:
        forms = config.get("form_id", [])
        for form_id in forms:
            yield SurveyctoStream(
                config=config,
                form_id=form_id
            )
    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        # auth = TokenAuthenticator(token="api_key")  # Oauth2Authenticator is also available if you need oauth support
        # return [Customers(authenticator=auth), Employees(authenticator=auth)]
        # auth = NoAuth()        
        streams = self.generate_streams(config=config)
        return streams