Duplicate data written in Elasticsearch despite incremental append sync mode in Airbyte Connector Builder

Summary

The user is facing an issue where duplicate data is being written to Elasticsearch despite using incremental append sync mode in Airbyte Connector Builder. The provided YAML configuration and data sample are used to fetch data from an API and write it to Elasticsearch.


Question

Hi, I’ve just started using Airbyte, this great product, to fetch data from an API using Connector Builder and then write it to Elasticsearch. You can see my YAML in the following. My problem is that whenever the connection runs although my sync mode is increamental append, it writes duplicate data again and again. What’s the problem and how to fix it?

  type: Spec
  connection_specification:
    type: object
    $schema: <http://json-schema.org/draft-07/schema#>
    required:
      - start_date
      - api_key
    properties:
      api_key:
        type: string
        order: 1
        title: API Key
        airbyte_secret: true
      start_date:
        type: string
        order: 0
        title: Start date
        format: date-time
        pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$
    additionalProperties: true
type: DeclarativeSource
check:
  type: CheckStream
  stream_names:
    - responses
streams:
  - name: responses
    type: DeclarativeStream
    retriever:
      type: SimpleRetriever
      paginator:
        type: DefaultPaginator
        page_token_option:
          type: RequestOption
          field_name: skip
          inject_into: request_parameter
        pagination_strategy:
          type: OffsetIncrement
          page_size: 200
      requester:
        path: responses
        type: HttpRequester
        url_base: <https://api.feedbackly.com/v5.0.3/>
        http_method: GET
        authenticator:
          type: ApiKeyAuthenticator
          api_token: '{{ config[''api_key''] }}'
          inject_into:
            type: RequestOption
            field_name: Authorization
            inject_into: header
        error_handler:
          type: CompositeErrorHandler
          error_handlers:
            - type: DefaultErrorHandler
              backoff_strategies:
                - type: ConstantBackoffStrategy
                  backoff_time_in_seconds: 10
        request_headers: {}
        request_body_json: {}
        request_parameters:
          preferredLanguage: en
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path:
            - data
    primary_key:
      - response_id
    schema_loader:
      type: InlineSchemaLoader
      schema:
        type: object
        $schema: <http://json-schema.org/schema#>
        properties:
          _id:
            type: string
          tags:
            type: array
            items:
              type: string
          teamId:
            type: string
          answers:
            type: string
          browser:
            type: string
          fieldId:
            type: string
          language:
            type: string
          metadata:
            type: object
            properties:
              __url:
                type: string
          question:
            type: string
          surveyId:
            type: string
          teamName:
            type: string
          valueNum:
            type: number
          createdAt:
            type: number
          fieldName:
            type: string
          questionId:
            type: string
          surveyName:
            type: string
          valueArray:
            type: array
            items:
              type: string
          response_id:
            type: string
          valueString:
            type: string
          questionType:
            type: string
          touchpointId:
            type: string
          createdAtDate:
            type: string
          load_date_time:
            type: string
          touchpointName:
            type: string
          responseChainId:
            type: string
    transformations:
      - type: AddFields
        fields:
          - path:
              - answers
            value: '"{{ record["answer"]}}"'
      - type: RemoveFields
        field_pointers:
          - - answer
      - type: AddFields
        fields:
          - path:
              - response_id
            value: '{{ record["_id"]}}'
      - type: AddFields
        fields:
          - path:
              - load_date_time
            value: '{{ now_utc() }}'
    incremental_sync:
      type: DatetimeBasedCursor
      cursor_field: createdAt
      start_datetime:
        type: MinMaxDatetime
        datetime: '{{ config[''start_date''] }}'
        datetime_format: '%Y-%m-%dT%H:%M:%SZ'
      datetime_format: '%s'
      cursor_datetime_formats:
        - '%s'
version: 0.51.41
metadata:
  autoImportSchema:
    responses: true```

Data Sample:

```{
    "valueNum": 0.75,
    "language": "fi",
    "questionType": "Button",
    "questionId": "5fd879a1243f7eb0",
    "surveyId": "5fd879a1265043f7eb6",
    "createdAt": 1608024549,
    "createdAtDate": "2020-12-15T09:29:09.474Z",
    "_id": "5fd881e510aaf51a6425a45a",
    "question": "Millainen tunne sinulle jäi asioidessasi verkkokaupassamme?  ",
    "responseChainId": "5fd881efa90593",
    "teamId": "5fd358ef5f85",
    "teamName": "Sto",
    "touchpointId": "5fd87b2238112a6a2977c409",
    "browser": "Windows Chrome 87.0.4280.88",
    "answers": "0.75",
    "response_id": "5fd881e5425a45a",
    "load_date_time": "2024-01-26 13:03:50.984099+00:00"
  }```

<br>

---

This topic has been created from a Slack thread to give it more visibility.
It will be on Read-Only mode here. [Click here](https://airbytehq.slack.com/archives/C021JANJ6TY/p1706274262031039) if you want to access the original thread.

[Join the conversation on Slack](https://slack.airbyte.com)

<sub>
["airbyte", "connector-builder", "elasticsearch", "incremental-append", "duplicate-data", "yaml", "api"]
</sub>