Troubleshooting state_checkpoint_interval

Hi everyone,

I’m currently working on developing an Airbyte connector and I’m running into an issue with the state_checkpoint_interval property. I have it set to save the state every 10 records to test, but even after fetching many more than 10 records, I’m not seeing the state get updated in the UI. It does update when it finishing pulling the total records.

I’ve tried a few different things to troubleshoot the issue, such as changing the value of state_checkpoint_interval and checking the logs to see if there are any error messages related to the state saving process. However, I haven’t had any luck in resolving the issue so far.

Any advice or guidance is well apprecited.

Thanks,
David

Hello there! You are receiving this message because none of your fellow community members has stepped in to respond to your topic post. (If you are a community member and you are reading this response, feel free to jump in if you have the answer!) As a result, the Community Assistance Team has been made aware of this topic and will be investigating and responding as quickly as possible.
Some important considerations that will help your to get your issue solved faster:

  • It is best to use our topic creation template; if you haven’t yet, we recommend posting a followup with the requested information. With that information the team will be able to more quickly search for similar issues with connectors and the platform and troubleshoot more quickly your specific question or problem.
  • Make sure to upload the complete log file; a common investigation roadblock is that sometimes the error for the issue happens well before the problem is surfaced to the user, and so having the tail of the log is less useful than having the whole log to scan through.
  • Be as descriptive and specific as possible; when investigating it is extremely valuable to know what steps were taken to encounter the issue, what version of connector / platform / Java / Python / docker / k8s was used, etc. The more context supplied, the quicker the investigation can start on your topic and the faster we can drive towards an answer.
  • We in the Community Assistance Team are glad you’ve made yourself part of our community, and we’ll do our best to answer your questions and resolve the problems as quickly as possible. Expect to hear from a specific team member as soon as possible.

Thank you for your time and attention.
Best,
The Community Assistance Team

# Basic incremental stream
class IncrementalStream(HttpStream, IncrementalMixin):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._state = {}

    state_checkpoint_interval = 5


    @property
    def cursor_field(self) -> str:
        return CURSOR_FIELD

    @property
    def state(self) -> Mapping[str, Any]:
         return self._state

    @state.setter
    def state(self, value: Mapping[str, Any]):
        """
        Define state as a max between given value and current state
        """
        if not self._state:
            self._state = {self.cursor_field: value[self.cursor_field]}
        else:
            self._state = {self.cursor_field: max(value[self.cursor_field], self.state[self.cursor_field])}

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        records = response.json().get("transactions")
       
        if records:
            for record in records:
                yield from self.fetch_record(record)

    def fetch_record(self, record: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
        response = self._send_request(prep_req, request_kwargs={})
        yield response.json()



class Transactions(IncrementalStream):
    primary_key = "id"

    def path(self, **kwargs) -> str:
        return REST_PATH + "transactions"


Am I missing anything here to commit the state every 5 records?

Hey, @dturton, thanks for your patience! Unfortunately our team does not currently have the capacity to support custom connector development. That being said, I did find a GA connector that checkpoints every 10 records -
https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py

I hope that example helps! I also encourage you to post in the Slack channel and reach out to other community members that have developed custom connectors, there are lots of people online over there!