Using per Stream state in Plain Python Source Connector

Hello,
I am writing a source connector which was generated via CDK and it is a plain source collector (ie check, discover and read). I have been using state to keep track for all of the streams at once. So every time I emit it, it displays full state for all connectors. I see that you have option to do state per stream which sounds appealing to me. Right now my logs are taking too much space because state gets printed in full every time it gets emitted.
I tried to implement per stream state as follows: (this is in Read function):

stream_state = AirbyteStateMessage(
            type=AirbyteStateType.STREAM,
            stream=AirbyteStreamState(
                stream_descriptor=StreamDescriptor(name="movies", namespace="public"),
                stream_state=AirbyteStateBlob.parse_obj({"cursor": "12345"}),
            ),
        )
        yield AirbyteMessage(type=Type.STATE, state=stream_state)

It seems that it works. I get different states stored in settings of the connection and it gets printed per stream.
The problem now is how do I read this? When I check state in read method, it just returns {“type”, “stream”}. How do I read it? BTW, i am looping through all streams in Read function.

Another approach would be NOT to print the actual state in Sync log. Is there a way to emit state and not print it in logs?

Thank you.

Hello there! You are receiving this message because none of your fellow community members has stepped in to respond to your topic post. (If you are a community member and you are reading this response, feel free to jump in if you have the answer!) As a result, the Community Assistance Team has been made aware of this topic and will be investigating and responding as quickly as possible.
Some important considerations that will help your to get your issue solved faster:

  • It is best to use our topic creation template; if you haven’t yet, we recommend posting a followup with the requested information. With that information the team will be able to more quickly search for similar issues with connectors and the platform and troubleshoot more quickly your specific question or problem.
  • Make sure to upload the complete log file; a common investigation roadblock is that sometimes the error for the issue happens well before the problem is surfaced to the user, and so having the tail of the log is less useful than having the whole log to scan through.
  • Be as descriptive and specific as possible; when investigating it is extremely valuable to know what steps were taken to encounter the issue, what version of connector / platform / Java / Python / docker / k8s was used, etc. The more context supplied, the quicker the investigation can start on your topic and the faster we can drive towards an answer.
  • We in the Community Assistance Team are glad you’ve made yourself part of our community, and we’ll do our best to answer your questions and resolve the problems as quickly as possible. Expect to hear from a specific team member as soon as possible.

Thank you for your time and attention.
Best,
The Community Assistance Team

Hey @yzislin,

Thanks for your question. I’m guessing you tried running this following command?

python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json | grep STATE | tail -n 1 | jq .state.data > sample_files/state.json

What are the contents of state.json?

Hi @sajarin ,

Thanks for your reply. I have just figured out my problem.
It looks like I was doing State emitting correctly but I was not view the state back properly.
First, I had to make sure to use later version of CDK via setup.py file.
Next, I finally managed to find a sample code on how to deal with the new state object,

            if is_per_stream_state:
                parsed_state_messages = []
                for one_state in state:
                    parsed_message = AirbyteStateMessage.parse_obj(one_state)
                    logger.info(parsed_message)

I wish there were an official documentation explaining these objects. I could only find object structs (which was huge help and got me to emit state correctly) but reading was a different story.

Thanks

Hey @yzislin,

Thanks for following up on this and posting your solution. As we continue to rollout per stream state, we’ll be sure to update our CDK and our docs to make it easier for developers to implement it in their own connectors. In the meantime, I hope your post can help others who are stuck with the same problem. Feel free to post more questions in the forums!