Low code connector: jsonschema.exceptions.ValidationError

I am building a low code source connector for an API. I am running in to an issue when creating a docker container for it.

Running Airbyte 0.41.0.

First, when I run the connector like this:

python3 main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json

It runs OK. But when I run docker build and update the tag in Airbyte UI I get the following error:

airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 - jsonschema.exceptions.ValidationError: '*ref(definitions.consultations_stream)' is not of type 'object'
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 - 
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 - Failed validating 'type' in schema['properties']['streams']['items']:
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -     {'additionalProperties': True,
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -      'description': 'A stream whose behavior is described by a set of '
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                     'declarative low code components',
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -      'properties': {'$parameters': {'additional_properties': True,
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                     'type': 'object'},
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                     'incremental_sync': {'anyOf': [{'$ref': '#/definitions/CustomIncrementalSync'},
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                                    {'$ref': '#/definitions/DatetimeBasedCursor'}]},
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                     'name': {'default': '',
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                              'definition': 'The stream name',
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                              'type': 'string'},
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                     'primary_key': {'$ref': '#/definitions/PrimaryKey',
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                     'default': '',
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                     'definition': 'The primary key of the '
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                                   'stream'},
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                     'retriever': {'anyOf': [{'$ref': '#/definitions/CustomRetriever'},
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                             {'$ref': '#/definitions/SimpleRetriever'}],
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                   'definition': 'Component used to '
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                                 'coordinate how records '
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                                 'are extracted across '
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                                 'stream slices and request '
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                                 'pages'},
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                     'schema_loader': {'anyOf': [{'$ref': '#/definitions/InlineSchemaLoader'},
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                                 {'$ref': '#/definitions/JsonFileSchemaLoader'}],
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                       'definition': 'The schema loader '
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                                     'used to retrieve the '
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                                     'schema for the '
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                                     'current stream'},
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                     'transformations': {'definition': 'A list of '
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                                       'transformations to '
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                                       'be applied to each '
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                                       'output record in '
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                                       'the',
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                         'items': {'anyOf': [{'$ref': '#/definitions/AddFields'},
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                                             {'$ref': '#/definitions/CustomTransformation'},
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                                             {'$ref': '#/definitions/RemoveFields'}]},
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                                         'type': 'array'},
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                     'type': {'enum': ['DeclarativeStream'],
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -                              'type': 'string'}},
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -      'required': ['type', 'retriever'],
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -      'type': 'object'}
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 - 
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 - On instance['streams'][0]:
airbyte-worker                    | 2023-02-25 23:38:14 ERROR i.a.c.i.LineGobbler(voidCall):114 -     '*ref(definitions.consultations_stream)'

I have previously had a working manifest.yaml where I wasn’t using refs. That manifest was generated by the connector builder UI. Now I need to modularize it to be able to reuse configuration blocks for more streams.

Here is my manifest.yaml:

version: 0.28.0
type: DeclarativeSource
check:
  type: CheckStream
  stream_names:
    - consultations

definitions:
  url_base: "https://provetcloud.com/{{ config['clinic_id'] }}/api/0.1/"

  base_paginator:
    type: DefaultPaginator
    page_token_option:
      type: RequestOption
      inject_into: request_parameter
      field_name: page
    pagination_strategy:
      type: "CursorPagination"
      cursor_value: "{{ last_records['next'] }}"
    url_base: "*ref(definitions.url_base)"

  base_record_selector:
    type: RecordSelector
    extractor:
      type: DpathExtractor
      field_path: [ "results" ]
      field_pointer: [ "results" ] # To make local validator happy

  base_requester:
    type: HttpRequester
    name: Base retriever
    url_base: "*ref(definitions.url_base)"
    path: "{{ options['path'] }}"
    http_method: GET
    authenticator:
      type: ApiKeyAuthenticator
      api_token: 'Token {{ config[''api_key''] }}'
      header: Authorization
  
  base_incremental_sync:
    type: DatetimeBasedCursor
    cursor_field: admitted_time
    datetime_format: '%Y-%m-%d'
    cursor_granularity: P1D
    end_datetime:
      datetime: "{{ now_utc() }}"
      datetime_format: "%Y-%m-%d %H:%M:%S.%f+00:00"
    start_datetime:
      datetime: "{{ config['start_date'] }}"
      datetime_format: "%Y-%m-%d"
    step: P1D
    start_time_option:
      inject_into: request_parameter
      type: RequestOption
      field_name: "{{ options['stream_cursor_field'] }}__gte"

  base_stream:
    type: DeclarativeStream
    primary_key:
      - id
    schema_loader:
      type: JsonFileSchemaLoader
      file_path: "source_provet_cloud/schemas/{{ options['name'] }}.json"
    retriever:
      type: SimpleRetriever
      requester:
        "*ref(definitions.base_requester)"
      record_selector:
        "*ref(definitions.base_record_selector)"
      paginator:
        "*ref(definitions.base_paginator)"
    # incremental_sync:
    #   "*ref(definitions.base_incremental_sync)"
  
  consultations_stream:
    $ref: "*ref(definitions.base_stream)"
    $options:
      name: "consultations"
      path: "/consultation"
      stream_cursor_field: "admitted_time"

streams:
  - $ref: "*ref(definitions.consultations_stream)"

spec:
  connection_specification:
    $schema: http://json-schema.org/draft-07/schema#
    type: object
    required:
      - clinic_id
      - start_date
      - api_key
    properties:
      clinic_id:
        title: Clinic ID
        airbyte_secret: true
        type: string
      start_date:
        title: Start date
        type: string
      api_key:
        type: string
        title: API Key
        airbyte_secret: true
    additionalProperties: true
  documentation_url: https://example.org
  type: Spec

Can anybody help me figure out what is wrong here?

Hello there! You are receiving this message because none of your fellow community members has stepped in to respond to your topic post. (If you are a community member and you are reading this response, feel free to jump in if you have the answer!) As a result, the Community Assistance Team has been made aware of this topic and will be investigating and responding as quickly as possible.
Some important considerations that will help your to get your issue solved faster:

  • It is best to use our topic creation template; if you haven’t yet, we recommend posting a followup with the requested information. With that information the team will be able to more quickly search for similar issues with connectors and the platform and troubleshoot more quickly your specific question or problem.
  • Make sure to upload the complete log file; a common investigation roadblock is that sometimes the error for the issue happens well before the problem is surfaced to the user, and so having the tail of the log is less useful than having the whole log to scan through.
  • Be as descriptive and specific as possible; when investigating it is extremely valuable to know what steps were taken to encounter the issue, what version of connector / platform / Java / Python / docker / k8s was used, etc. The more context supplied, the quicker the investigation can start on your topic and the faster we can drive towards an answer.
  • We in the Community Assistance Team are glad you’ve made yourself part of our community, and we’ll do our best to answer your questions and resolve the problems as quickly as possible. Expect to hear from a specific team member as soon as possible.

Thank you for your time and attention.
Best,
The Community Assistance Team

I managed to fix this by using a different reference notation. Here is the working manifest.yaml:

version: 0.28.0
type: DeclarativeSource

definitions:
  url_base: "https://provetcloud.com/{{ config['clinic_id'] }}/api/0.1/"

  base_schema_loader:
    type: JsonFileSchemaLoader
    file_path: "./source_provet_cloud/schemas/consultations.json"

  base_paginator:
    type: DefaultPaginator
    url_base: "#/definitions/url_base"
    page_token_option:
      type: RequestOption
      inject_into: request_parameter
      field_name: page
    page_token_option:
      type: RequestPath
    pagination_strategy:
      type: "CursorPagination"
      cursor_value: "{{ response.next }}"

  base_requester:
    type: HttpRequester
    name: Base retriever
    url_base: "#/definitions/url_base"
    http_method: GET
    authenticator:
      type: ApiKeyAuthenticator
      api_token: 'Token {{ config[''api_key''] }}'
      header: Authorization

  base_record_selector:
    type: RecordSelector
    extractor:
      type: DpathExtractor
      field_path:
        - results
      field_pointer:
        - results
  base_incremental_sync:
    type: DatetimeBasedCursor
    datetime_format: '%Y-%m-%d'
    cursor_granularity: P1D
    end_datetime:
      datetime: "{{ now_utc() }}"
      datetime_format: "%Y-%m-%d %H:%M:%S.%f+00:00"
    start_datetime:
      datetime: "{{ config['start_date'] }}"
      datetime_format: "%Y-%m-%d"
    step: P1D
    start_time_option:
      inject_into: request_parameter
      type: RequestOption

check:
  type: CheckStream
  stream_names:
    - Consultations
streams:
  - type: DeclarativeStream
    name: Consultations
    primary_key: [ "id" ]
    schema_loader: "#/definitions/base_schema_loader"
    retriever:
      type: SimpleRetriever
      requester:
        $ref: "#/definitions/base_requester"
        path: /consultation
      record_selector: "#/definitions/base_record_selector"
      paginator: "#/definitions/base_paginator"
    incremental_sync:
      type: DatetimeBasedCursor
      cursor_field: admitted_time
      datetime_format: '%Y-%m-%d'
      cursor_granularity: P1D
      end_datetime:
        datetime: "{{ now_utc() }}"
        datetime_format: "%Y-%m-%d %H:%M:%S"
      start_datetime:
        datetime: "{{ config['start_date'] }}"
        datetime_format: "%Y-%m-%d %H:%M:%S"
      step: P1D
      start_time_option:
        inject_into: request_parameter
        type: RequestOption
        field_name: admitted_time__gte

spec:
  connection_specification:
    $schema: http://json-schema.org/draft-07/schema#
    type: object
    required:
      - clinic_id
      - start_date
      - api_key
    properties:
      clinic_id:
        title: Clinic ID
        airbyte_secret: true
        type: string
      start_date:
        title: Start date
        type: string
      api_key:
        type: string
        title: API Key
        airbyte_secret: true
    additionalProperties: true
  documentation_url: https://example.org
  type: Spec

I have not figured out how to break out the incremental_sync part because I need a dynamic variable on two different node levels. I cannot use the $options node to inject variables in nodes further down. I get an error saying options not defined or something similar.

Now I have a different problem with the incremental_sync and datetime_format. It says:

File "/usr/local/lib/python3.9/_strptime.py", line 352, in _strptime
    raise ValueError("unconverted data remains: %s" %
ValueError: unconverted data remains: .962074+00:00

But I will post that in a different thread.

Hey, @martindlarsson! Thanks for your patience and so glad you were able to figure this one out! Our team does not currently have the capacity to support custom connector development, so I encourage you to post in Slack and/or find other community members who have gone through the process!