Hi Team! I have implemented a custom connector using the Python CDK and am currently wondering about the persistence of state when syncs fail. Namely, if a sync fails, is the state still persistent and used in the next sync?
I’ve been trying to find an answer myself and came across Periodically checkpoint state during syncs · Issue #2627 · airbytehq/airbyte · GitHub, where it seems mid-stream persistence has been implemented (?), but I can’t find details about what changed. In my case, it’s okay if the state is not to persistet if a sync fails, because I can handle duplicate records after re-triggering the sync in the destination. However, I’m not sure what is the default behavior right now. I also read about the section Checkpointing state (see here Incremental Streams - Airbyte Documentation) section in the docs, which seem to be for persisting state mid-stream, but I have not implemented any of the two methods. Am I correct that state is only persistent at the end of a successful sync, if I haven’t used one of the two checkpointing state methods?
Hey @Timo
Let me summarize and clarify the default behaviors around state checkpointing:
- State checkpointing is available on incremental sync only.
- It happens after a stream slices was entirely red or when the counter of records matches the
state_checkpoint_interval
value. - If you both use stream slices and
state_checkpoint_interval
the checkpointing happens when the record count matches thestate_checkpoint_interval
within a stream slice.
Am I correct that state is only persistent at the end of a successful sync, if I haven’t used one of the two checkpointing state methods?
No, because the state is checkpointed after successful read of a stream slice or after reading a batch of records corresponding to state_checkpoint_interval
. It avoids the need of re-reading already successfully synced data.
Hi augustin,
thanks for the quick reply! Just a follow up. In my custom connector, I’m using
state_checkpoint_interval = math.inf
to not save state in after a set number of read records at the moment, and furthermore I have not overwritten the default stream_slices
method in the Python cdk (see here airbyte/core.py at 8500fef4133d3d06e16e8b600d65ebf2c58afefd · airbytehq/airbyte · GitHub). So I have not defined what a stream slice is in my case. Can it still happen that the state is saved before a sync has successfully completed (e.g. after some data has been flushed and inserted in the destination)?
PS: I understand the benefits of making state saves mid-sync, and will think about changing the custom connector to make use of it, but the current implementation did not foresee saving of state if the sync was not entirely successful.
Hey @Timo,
What’s the parent class of your incremental stream?
If you don’t want to benefit from the state checkpointing I’d rather set state
class attribute to None
and not set state_checkpoint_interval
as it already defaults to None
.
It inherits from HttpStream
(airbyte_cdk.sources.streams.http
). I see that there is a (new ?) IncrementalMixin
now, but the current implementation does not make use of that Mixin.
IncrementalMixin
was recently added to the CDK to simplify incremental implementation and state management. The get_updated_state
is now deprecated, so feel free to use the new IncrementalMixin
.
Hi there from the Community Assistance team.
We’re letting you know about an issue we discovered with the back-end process we use to handle topics and responses on the forum. If you experienced a situation where you posted the last message in a topic that did not receive any further replies, please open a new topic to continue the discussion. In addition, if you’re having a problem and find a closed topic on the subject, go ahead and open a new topic on it and we’ll follow up with you. We apologize for the inconvenience, and appreciate your willingness to work with us to provide a supportive community.