Hello everyone, I have implemented a connector using Xola API. I made one DateTime column as cursor_field and implemented get_updated_state
function, It seems to be not working. can anyone please tell what add more:
class Orders(XolaStream):
primary_key = "order_id"
cursor_field = None
cursor_field_value = None
seller_id = None
def __init__(self, seller_id: str, x_api_key: str, cursor_field: str, cursor_field_value: str, **kwargs):
super().__init__(x_api_key, **kwargs)
self.seller_id = seller_id
if cursor_field:
self.cursor_field = cursor_field
self.cursor_field_value = cursor_field_value
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None
) -> str:
"""
should return "orders". Required.
"""
return "orders"
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None,
next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
"""
seller id is returned as a form of parameters
"""
params = {}
if next_page_token:
for key in next_page_token.keys():
params[key] = next_page_token[key]
params['seller'] = self.seller_id
return params
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
TODO: Override this method to define how a response is parsed.
:return an iterable containing each record in the response
"""
raw_response = response.json()["data"]
modified_response = []
for data in raw_response:
try:
# Tags._id
resp = {"tags": []}
.......
resp["order_id"] = data["id"]
if "createdAt" in data.keys(): resp["createdAt"] = data["createdAt"]
if "customerName" in data.keys(): resp["customerName"] = data["customerName"]
.......
if "createdBy" in data.keys(): resp["createdBy"] = data["createdBy"]
.......
if "updatedAt" in data.keys(): resp["updatedAt"] = data["updatedAt"]
modified_response.append(resp)
except:
pass
return modified_response
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> \
Mapping[str, Any]:
"""
Override to determine the latest state after reading the latest record. This typically compared the cursor_field from the latest record and
the current state and picks the 'most' recent cursor. This is how a stream's state is determined. Required for incremental.
"""
if current_stream_state is not None and self.cursor_field in current_stream_state:
current_parsed_date = current_stream_state[self.cursor_field]
latest_record_date = latest_record[self.cursor_field]
return {self.cursor_field: max(current_parsed_date, latest_record_date)}
else:
return {self.cursor_field: self.cursor_field_value}```
here in config file I pass `updatedAt` as `cursor_field` and a DateTime value string as `cursor_field_value`. I am expecting to get records whose DateTime greater than that of cursor_field_value to be returned