Recent Changes in State Messages Emission

Summary

Airbyte emitting empty state messages causing issues with destination-aws-datalake incremental syncs


Question

:wave: hi all - did something change recently on how Airbyte emits state messages? We use the destination-aws-datalake destination and it uses an empty state message for incremental streams as a means to recreate the table for a given stream (when a stream gets reset, the state is reset + an empty state msg is emitted and that is/was the only way to determine when a full drop and replace is needed). In the past few days we’ve increasingly seen incremental syncs emit empty state messages for what appears no reason which has caused some of our integrations to stop working because the destination will try to recreate the table and remove all glue partitions. If something has changed here can someone point me to the changelog or more information on what has changed? Any help here is greatly appreciated! Thanks :pray:



This topic has been created from a Slack thread to give it more visibility.
It will be on Read-Only mode here. Click here if you want
to access the original thread.

Join the conversation on Slack

["airbyte", "state-messages", "destination-aws-datalake", "incremental-syncs", "changelog"]

<@U02TQLBLDU4> just created a PR that should fix destination-aws-datalake and support the new state message protocol here: https://github.com/airbytehq/airbyte/pull/36386 would you be able to take a look?

<@U02TQLBLDU4> yeah I noticed 1 test is failing locally:

[1m[31mE       assert [AirbyteMessage(type=&lt;Type.STATE: 'STATE'&gt;, log=None, spec=None, connectionStatus=None, catalog=None, record=None, state=AirbyteStateMessage(type=None, stream=None, global_=None, data={'state': '1'}), trace=None, control=None),\n AirbyteMessage(type=&lt;Type.STATE: 'STATE'&gt;, log=None, spec=None, connectionStatus=None, catalog=None, record=None, state=AirbyteStateMessage(type=None, stream=None, global_=None, data={'state': '2'}), trace=None, control=None)] == [][0m
[1m[31mE         Left contains 2 more items, first extra item: AirbyteMessage(type=&lt;Type.STATE: 'STATE'&gt;, log=None, spec=None, connectionStatus=None, catalog=None, record=None, state=AirbyteStateMessage(type=None, stream=None, global_=None, data={'state': '1'}), trace=None, control=None)[0m```
however I don't know if the test is configured correctly? The `AirbyteStateMessage` should have a `type` in this case of `type=STREAM` . I'm checking for a type in my updated code and that's most likely why the destination is not checkpointing the state messages...

I’ll take a look later today