I’ve got a connector whose state appears to be set incorrectly between runs through Airbyte:
I’ve got the Connector written such that the first run will pull down all the data, and subsequent runs will only return records that are newer than the saved state.
It works fine on its first run, and a subsequent run after, but the third run fails because the state object has been “corrupted” (see image above) and I can’t figure out why. I suspect that it’s probably something to do with the get_updated_state()
function but I’m not sure.
Code below:
class SourceIncrementalStream(SignrequestStream, IncrementalMixin):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._state = {}
@property
def state(self):
return self._state
@state.setter
def state(self, value):
self._state[self.cursor_field] = value
@property
@abstractmethod
def cursor_field(self) -> str:
pass
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
if current_stream_state == {}:
self.state = latest_record[self.cursor_field]
return {self.cursor_field: latest_record[self.cursor_field]}
else:
records = {}
records[current_stream_state[self.cursor_field]] = pendulum.parse(current_stream_state[self.cursor_field])
records[latest_record[self.cursor_field]] = pendulum.parse(latest_record[self.cursor_field])
latest_record = max(records.items(), key=lambda x: x[1])
self.state = latest_record[0]
return {self.cursor_field: latest_record[0]}
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
"""
Override this method to define how a response is parsed.
:return an iterable containing each record in the response
"""
def __newer_than_latest(recorded_state: dict, latest_record: dict) -> bool:
latest_record_date = pendulum.parse(latest_record[self.cursor_field])
recorded_state = pendulum.parse(recorded_state[self.cursor_field])
if recorded_state >= latest_record_date:
return False
else:
return True
raw = response.json()
results = raw.get("results")
if stream_state:
if not __newer_than_latest(stream_state, results[0]):
yield from []
else:
only_the_newest = [x for x in results if __newer_than_latest(stream_state, x)]
yield from only_the_newest
else:
yield from results
Am I even using state correctly? Or is there a different problem?
Hello @cornjuliox, could you please tell me what version of Airbyte and connectors you are using, and what your system setup is?
Sure - Airbyte version 0.39.32-alpha, the connectors that are affected are all custom. I don’t know much about the setup right now because we’ve got another guy handling it; all I know is that its running on a Docker cluster somewhere in AWS.
We have another incremental sync failing today whose state looks just like the above, and its for a custom connector that I haven’t touched in a while; which suggests that the issue might be with Airbyte itself. Just thought I’d put that there in case it helps with the investigation
logs-7254.txt (273.0 KB)
Could you please give me the logs for both syncs? That’ll help me debug this quicker!
Sure. Here are the logs for the first one, and the logs for the 2nd one are already attached to the post above yours
logs-64.txt (78.9 KB)
For the first connector:
I see that you’re getting a TypeError: expected string or bytes-like object
. I think the problem is indeed in parse_response
, could you check if the returned object looks different depending on the stream_state
? Also, could you look if the recorded_state
is being set correctly on consecutive syncs at recorded_state = pendulum.parse(recorded_state[self.cursor_field])
? It would be helpful to see the connector schema if possible.
For the second connector:
I’ve found several other errors in this connector, issues for two are currently open:
Class path contains multiple SLF4J bindings.
https://github.com/airbytehq/airbyte/issues/9093
null found, object expected
https://github.com/airbytehq/airbyte/issues/15161
Could you please tell me what environment you’re running this in, and also update Airbyte to the latest version.
Both connectors actually run fine locally - its when I run them through airbyte that the problem occurs, so it’s difficult for me to replicate accurately on my machine
Basically what I see happening is that recorded_state[self.cursor_field]
is supposed to return a string representation of a datetime, but ends up returning another dictionary containing {updated_at: <timestamp>}
causing pendulum to fail.
As for our environment I’m waiting for the guy handling devops to respond (we are in different timezones) so I can’t provide much new information; was there anything in particular you wanted to know about our environment?
Yes, I’d love to know the OS Version/Instance and the Memory/Disk used for this.
Just so I understand correctly, when you’re talking about running them through airbyte - do you mean using airbyte cloud?
Sorry for the delayed reply, I’m still not getting notifications from Discourse.
Just so I understand correctly, when you’re talking about running them through airbyte - do you mean using airbyte cloud?
Not exactly - when I develop the connectors I test them by doing python main.py read .....
, using the --state
command line option to simulate a stored state, and once I’ve verified that it outputs without error I build the image and run it through the web UI of an instance of Airbyte hosted locally on my machine. That’s what I mean when I say I’m running them through Airbyte.
Our second connector, on the other hand, has been in “production” (on an instance of Airbyte running in EC2) for a while without issue, but is now experiencing the same issue with the stored state.
OS details from said instance:
user@host:~$ docker --version
Docker version 18.09.1, build 4c52b90
user@host:~$ docker-compose --version
docker-compose version 1.29.2, build 5becea4c
uname -a
Linux ip-172-31-41-192 4.19.0-18-cloud-amd64 #1 SMP Debian 4.19.208-1 (2021-09-29) x86_64 GNU/Linux
I’ll have to get back to you on the EC2 instance specs though.
I see, thanks for all the info! Did this issue start happening recently? Were you able to upgrade Airbyte to the latest version?
Could you share a repo of the connectors with me so I can take a look at the code?
Could you please upgrade Airbyte to the latest version? We are now up to 0.40.0
; I would like to see if this has an impact on the issues you are encountering. Here’s the upgrade guide:
https://docs.airbyte.com/operator-guides/upgrading-airbyte/
Sure I will ask (our instance is managed by someone else so I have to put in a request for it).
Is there any way to “clear” the state from the UI? I figure I will need to do that before it will start working again, because if the affected connectors read the “doubled” state as-is they will fail again.
Hi! The team was doing off-site meetings last week, so I apologize for the delay. I’m looking into how you might. clear the state without deleting the connection, so hope to have some ideas on that for you soon.
Hi, is there any update on this? We’ve got another connector that’s no longer working as of today because the “state” is all messed up. Is there a way to reset it without clearing all the data?
OK thanks! I’ll give this a try. Is there any word as to what’s causing it to be like this? Is it an error in the way I’m using state in the connector, or something else?
I think you were onto the problem here - “I see happening is that recorded_state[self.cursor_field]
is supposed to return a string representation of a datetime, but ends up returning another dictionary containing {updated_at: <timestamp>}
causing pendulum to fail.”
As these are custom connectors and I don’t have access to the full code, I am not able to debug them as well as I could if these were public. If you are able to publish them, you could raise an issue on GitHub to get help with debugging this!
Our fork is public and I’ve linked the code for the respective connectors a few posts up -
We actually have plans to submit the “snipeit” connector for review at some point this year