Ensuring Data Alignment with Declared Schema in Airbyte Connector

Summary

The user developed a new connector using low-code in Airbyte and is facing an issue where the id property is not considered as an integer as declared in the schema files, but as a number, leading to a double in AVRO GCS destination. The user is seeking guidance on how to fix this issue.


Question

Hi community :wave:
I developed a new connector using low-code, then I have:
• manifest.yaml
• integration_tests/configured_catalog.json => screenshot 1
• schemas/xxx.json files for each stream describing its schema => screenshot 2
My case is, how can I ensure the output data will be aligned with the declared schema in .json files ?
Some tests showed that id property is not considered as an integer as declared, but as a number (leading to double in AVRO GCS destination)
How can I fix it ?
Thanks a lot :pray:



This topic has been created from a Slack thread to give it more visibility.
It will be on Read-Only mode here. Click here if you want
to access the original thread.

Join the conversation on Slack

["new-connector", "data-alignment", "declared-schema", "avro", "gcs-destination"]

hello, can you please help me know, what is meant by json schema in the code snippet in the attached photo.

hello <@U06H60WU894>, it is the expected output schema returned by the LinkedIn source API (here for lead_form stream)

in low-code, are we mandatory to define the streams schemas in the YAML file ?

Well, I am not aware about the issue that youare facing, actually I am facing some issue related to json in Airbyte, and so was curious to know, whether in configurations in streams can we add json schema or not.
Thanks for responding. And sorry, am not aware about your issue.

I tried to add below code in the YAML manifest:

  type: JsonFileSchemaLoader
  file_path: "./source_linkedin_leads/schemas/lead_form.json"```
and encountered below error at running time:
```2024-07-24 09:17:13 destination > ERROR i.a.i.b.FailureTrackingAirbyteMessageConsumer(accept):67 Exception while accepting message tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro: Could not evaluate union, field reviewInfo is expected to be one of these: NULL, RECORD. If this is a complex type, check if offending field (path: reviewInfo.lastUpdated) adheres to schema: {rejectionReasons=[], lastUpdated=1720710761453, reviewStatus=AUTO_APPROVED}
        at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:131) ~[converter-1.1.0.jar:?]
        at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:120) ~[converter-1.1.0.jar:?]
        at tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToGenericDataRecord(JsonAvroConverter.java:95) ~[converter-1.1.0.jar:?]
        at io.airbyte.integrations.destination.s3.avro.AvroRecordFactory.getAvroRecord(AvroRecordFactory.java:46) ~[io.airbyte.airbyte-integrations.bases-base-java-s3-24.0.2.jar:?]
        at io.airbyte.integrations.destination.s3.avro.AvroSerializedBuffer.writeRecord(AvroSerializedBuffer.java:52) ~[io.airbyte.airbyte-integrations.bases-base-java-s3-24.0.2.jar:?]
        at io.airbyte.integrations.destination.record_buffer.BaseSerializedBuffer.accept(BaseSerializedBuffer.java:106) ~[io.airbyte.airbyte-integrations.bases-base-java-24.0.2.jar:?]
        at io.airbyte.integrations.destination.record_buffer.SerializedBufferingStrategy.addRecord(SerializedBufferingStrategy.java:74) ~[io.airbyte.airbyte-integrations.bases-base-java-24.0.2.jar:?]
        at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.acceptTracked(BufferedStreamConsumer.java:205) ~[io.airbyte.airbyte-integrations.bases-base-java-24.0.2.jar:?]
        at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:65) ~[io.airbyte.airbyte-integrations.bases-base-java-24.0.2.jar:?]
        at io.airbyte.integrations.base.Destination$ShimToSerializedAirbyteMessageConsumer.consumeMessage(Destination.java:114) ~[io.airbyte.airbyte-integrations.bases-base-java-24.0.2.jar:?]
        at io.airbyte.integrations.base.Destination$ShimToSerializedAirbyteMessageConsumer.accept(Destination.java:90) ~[io.airbyte.airbyte-integrations.bases-base-java-24.0.2.jar:?]
        at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:234) ~[io.airbyte.airbyte-integrations.bases-base-java-24.0.2.jar:?]
        at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:214) ~[io.airbyte.airbyte-integrations.bases-base-java-24.0.2.jar:?]
        at io.airbyte.integrations.base.IntegrationRunner.lambda$runInternal$0(IntegrationRunner.java:153) ~[io.airbyte.airbyte-integrations.bases-base-java-24.0.2.jar:?]```

output API data looks fine regarding schema, but connector execution fails :confused:

fixed, order is important for AVRO, for instance:
• ["null", "string"] is valid
• ["string", "null"] is invalid and lead to error

<@U05FB419LMR> Did that resolve the issue for you?

yep, just switched the union typing declaration
maybe useful to be added in the documentation :pray: