Hello,
I am building a connector that is pulling data from an API. I am filtering data for a given date range. I read and understand that cursor with incremental sync can help you to pull data from the last time you run the synchronization between source and destination.
I am pulling data for today on an hourly basis.
Let’s say now the connection was down for some days, and it didn’t work. Then I put it up, and I want it to sync data from the last time a record was put in the destination table.
How can I achieve this with the cursor?
- Is it possible pull the cursor value from the destination table?
- Is there a way, after each sync, I can save the cursor value in a permanent storage, let’s say, the Postgres database? And pull it from there?
Some code for the context
Getter and setter for my cursor:
@property
def state(self) -> Mapping[str, Any]:
if getattr(self, '_cursor_value', None):
return {self.cursor_field: self._cursor_value}
else:
return {self.cursor_field: datetime.today().strftime('%Y-%m-%d')} # THIS IS THE DATE I WOULD LIKE TO SAVE ON A PERSISTENT STORAGE
@property
def cursor_field(self) -> str:
"""
Name of the field in the API response body used as cursor.
"""
return "date"
@state.setter
def state(self, value: Mapping[str, Any]):
self._cursor_value = value[self.cursor_field]
This is how I am updating the cursor.
for row in reader:
item = self.generate_item(row)
# this section deals with the cursor, to be revisited
current_cursor_value = pendulum_parse(self.state.get(self.cursor_field))
upcoming_cursor_value = pendulum_parse(getattr(item, self.cursor_field).strftime('%Y-%m-%d'))
cursor_value = (max(upcoming_cursor_value, current_cursor_value)).to_date_string()
max_cursor_value = {self.cursor_field: cursor_value}
self.state = max_cursor_value
yield item.dict()
Any pointer on how I can achieve this will be welcome.
Hello there! You are receiving this message because none of your fellow community members has stepped in to respond to your topic post. (If you are a community member and you are reading this response, feel free to jump in if you have the answer!) As a result, the Community Assistance Team has been made aware of this topic and will be investigating and responding as quickly as possible.
Some important considerations that will help your to get your issue solved faster:
- It is best to use our topic creation template; if you haven’t yet, we recommend posting a followup with the requested information. With that information the team will be able to more quickly search for similar issues with connectors and the platform and troubleshoot more quickly your specific question or problem.
- Make sure to upload the complete log file; a common investigation roadblock is that sometimes the error for the issue happens well before the problem is surfaced to the user, and so having the tail of the log is less useful than having the whole log to scan through.
- Be as descriptive and specific as possible; when investigating it is extremely valuable to know what steps were taken to encounter the issue, what version of connector / platform / Java / Python / docker / k8s was used, etc. The more context supplied, the quicker the investigation can start on your topic and the faster we can drive towards an answer.
- We in the Community Assistance Team are glad you’ve made yourself part of our community, and we’ll do our best to answer your questions and resolve the problems as quickly as possible. Expect to hear from a specific team member as soon as possible.
Thank you for your time and attention.
Best,
The Community Assistance Team
Hey thanks for reaching out to us. Currently, the cursor value i.e state is handled through persistent storage. We save the state in the database and when we next run the sync we get the state from there and run the sync
Thank you for your answer, @harshith.
So basically, if that is the case, it means my connector is working well.
Hey,
I am coming back to this question.
Would you advise the best way to pull the state from the persistent storage?
I am using this code here, but it seems to not work.
@property
def state(self) -> Mapping[str, Any]:
if getattr(self, '_cursor_value', None):
return {self.cursor_field: self._cursor_value}
else:
return {self.cursor_field: self.start_date.strftime('%Y-%m-%d')}
Hello Espoir Murhabazi, it’s been a while without an update from us. Are you still having problems or did you find a solution?