Issue with Incremental Sync and Connection State

Summary

User is experiencing issues with incremental sync not working as expected, leading to all rows being fetched on subsequent syncs instead of only new rows. The connection state is empty, suggesting problems with cursor generation despite incremental sync being enabled.


Question

Hello! :slightly_smiling_face:

We have some trouble getting incremental sync to work and I suspect there is something happening with the Connection State not working properly.

At initial sync, data is fetched and placed correctly - all rows. On the second sync, only new rows should be fetched and appended based on the cursor. However, all rows are fetched every time on all additional syncs.

Looking at the Connection state, it is empty on initial sync and on all additional syncs (see image). Obviously the incremental sync will not work when the cursor does not exist in the Connection State… so where is the state? Why does it not get generated, when we have enabled incremental sync?

Here is the exported YAML configuration from Builder:


type: DeclarativeSource

check:
  type: CheckStream
  stream_names:
    - export

definitions:
  streams:
    export:
      type: DeclarativeStream
      name: export
      retriever:
        type: SimpleRetriever
        paginator:
          type: DefaultPaginator
          pagination_strategy:
            type: OffsetIncrement
        requester:
          $ref: "#/definitions/base_requester"
          path: /export/v4.json
          http_method: POST
          request_body_json:
            config:
              limit: "{{ config['limit'] }}"
              offset: "{{ next_page_token['next_page_token'] | default(0) }}"
            search:
              to_date: >-
                {{
                format_datetime(now_utc().fromtimestamp(timestamp(stream_interval['start_time'])
                + 86400), '%Y-%m-%d') }}
              from_date: "{{ stream_interval['start_time'] }}"
        record_selector:
          type: RecordSelector
          extractor:
            type: DpathExtractor
            field_path: []
      primary_key:
        - id
      schema_loader:
        type: InlineSchemaLoader
        schema:
          $ref: "#/schemas/export"
      transformations:
        - type: AddFields
          fields:
            - path:
                - cursor_path
              value: "{{ record.sessions[0].created }}"
      incremental_sync:
        type: DatetimeBasedCursor
        step: P1D
        cursor_field: cursor_path
        start_datetime:
          type: MinMaxDatetime
          datetime: "{{ config[\"start_date\"] }}"
          datetime_format: "%Y-%m-%d"
        datetime_format: "%Y-%m-%d"
        cursor_granularity: P1D
        cursor_datetime_formats:
          - "%Y-%m-%dT%H:%M:%S.%f%z"
          - "%Y-%m-%d"
  base_requester:
    type: HttpRequester
    url_base: API URL
    authenticator:
      type: OAuthAuthenticator
      scopes: []
      client_id: "{{ config[\"client_id\"] }}"
      grant_type: client_credentials
      client_secret: "{{ config[\"client_secret\"] }}"
      expires_in_name: expires_in
      access_token_name: access_token
      refresh_request_body:
        scope: export:v4
      token_refresh_endpoint: API TOKEN URL

streams:
  - $ref: "#/definitions/streams/export"

spec:
  type: Spec
  connection_specification:
    type: object
    $schema: <http://json-schema.org/draft-07/schema#>
    required:
      - client_id
      - client_secret
      - limit
      - start_date
    properties:
      limit:
        type: string
        order: 2
        title: "Limit "
      client_id:
        type: string
        order: 0
        title: Client ID
        airbyte_secret: true
      start_date:
        type: string
        order: 3
        title: Start date
        format: date
        pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
      client_secret:
        type: string
        order: 1
        title: Client secret
        airbyte_secret: true
    additionalProperties: true

metadata:
  assist: {}
  testedStreams:
    export:
      hasRecords: true
      streamHash: 912f493bc5d977b1a08eb02f46b585f45617303c
      hasResponse: true
      primaryKeysAreUnique: true
      primaryKeysArePresent: true
      responsesAreSuccessful: true
  yamlComponents:
    streams:
      export:
        - paginator
        - incrementalSync
  autoImportSchema:
    export: true

schemas:
 ...SCHEMA```


<br>

---

This topic has been created from a Slack thread to give it more visibility.
It will be on Read-Only mode here. [Click here](https://airbytehq.slack.com/archives/C027KKE4BCZ/p1732884388222879) if you want
to access the original thread.

[Join the conversation on Slack](https://slack.airbyte.com)

<sub>
['incremental-sync', 'connection-state', 'cursor', 'yaml-configuration', 'data-fetching']
</sub>

i am working with <@U082A7P32H5> and i have a weak suspicion that the problem is with the fact that the cursor is pointing to a transformation of type AddFields … but the value does show up fine on the test and in destination, seems it should be able to parse ok with cursor_datetime_formats based on a sample

my other suspicion is that there is something that isn’t playing nice between DateTimeBasedCursor and OffsetIncrement

Anyone from Airbyte have input on this? We are very stuck on this final puzzlepart for our connector!

Anyone from Airbyte have input on this? We are very stuck on this final puzzlepart for our connector!

I’m having a similar issue with DateTimeBasedCursor and OffsetIncrement

Adding the pagination with offset and limit query params causes an “unknown error”, but it works fine if I uncheck pagination and include the offset and limit params manually.

the API we are looking at takes a date for its input parameters, but fields have a datetime field we are using for the cursor. This caused us some confusion, but the solution turns out to be
cursor_granularity: P0D for this.

<@U082H7A8K1P> we basically just ignored the use of offset and limit within the paginator, inject the parameters as shown in the manifest at the start of the thread. It seems to be just fine

<@U082A7P32H5> are you using Airbyte cloud or open-source? Can you share the complete yaml manifest so I can recreate the connector in my workspace?

<@U07TM968CRE> are you unblocked now or still need some help?

Open source and yes we are unblocked! We have learned a lot about stream interval

<@U07TM968CRE> feel free to open an issue to improve docs or send a pull request