Airbyte-Airflow Integration Issue: 'Object not found' Error When Updating Source

Summary

Issue encountered when updating a source using Airbyte’s API through Airflow’s DAG task, consistently receiving a ‘404 Object not found’ error.


Question

Help Needed: Airbyte-Airflow Integration Issue - “Object not found” Error When Updating Source

Hi everyone,
I’m relatively new to working with Airbyte and currently facing an issue while integrating it with Airflow. Specifically, when attempting to update a source using Airbyte’s API through Airflow’s DAG task, I consistently receive a “404 Object not found” error.

Context:
API Endpoint: I’m using the IP address http://192.168.100.17:8000/api/v1 to connect to my local Airbyte instance.
Successful Listing: When running the DAG task on Airflow’s UI, I can successfully list sources using the API endpoint (http://192.168.100.17:8000/api/v1).
Setup Details: Airbyte is locally hosted, and Airflow is running within a development container.
Code Snippets:

Method (AirbyteApiClient):
class AirbyteApiClient(CRUDMixin):
def init(self, connection: ApiConnection, base_url: str = “http://192.168.100.17:8000/api/v1”) -> None:
self.connection = connection
self.base_url = base_url
self.workspace_id = os.getenv(“AIRBYTE_WORKSPACE_ID”)
self.source_id = os.getenv(“AIRBYTE_SOURCE_ID”)

def update_source(self, source_id, name, workspace_id, configuration) -> None:
    url = f"{self.base_url}/sources/{source_id}"
    payload = {
        "name": name,
        "workspaceId": workspace_id,
        "configuration": configuration
    }
    headers = {'Content-Type': 'application/json'}
    response = requests.patch(url, json=payload, headers=headers)
    if response.status_code == 200:
        print(f"Source {source_id} updated successfully")
    else:
        raise Exception(f"Failed to update source {source_id}: {response.content}")

    return None

DAG Task:

Define the new configuration

new_configuration = {
“start_date”: “2020-05-01”,
“access_key”: “Exchange_rates_api_access_key”,
“base”: “EUR”,
“ignore_weekends”: True,
“sourceType”: “exchange-rates”,
}

def update_source(source_id, workspace_id, configuration) -> None:
airbyte_client.update_source(source_id, name, workspace_id, configuration)

update_source_task = PythonOperator(
task_id=“update_source_task”,
python_callable=update_source,
op_kwargs={“source_id”: source_id,
“name”: name,
“workspace_id”: workspace_id,
“configuration”: new_configuration},
dag=dag,
)

Error Log from Airflow UI:
[2024-06-18, 09:30:36 UTC] {logging_mixin.py:188} INFO - Sending PATCH request to URL: http://192.168.100.17:8000/api/v1/sources/e7066349-ea5b-4ec5-99ee-1a936c462c62
[2024-06-18, 09:30:36 UTC] {logging_mixin.py:188} INFO - Payload: {
“name”: “Exchange_Rates_Api”,
“workspaceId”: “64cf55a5-7a9c-4a15-8928-e3d2eddfaeb7”,
“configuration”: {
“start_date”: “2020-05-01”,
“access_key”: “Exchange_rates_API_access_key”,
“base”: “EUR”,
“ignore_weekends”: true,
“sourceType”: “exchange-rates”
}
}
[2024-06-18, 09:30:36 UTC] {logging_mixin.py:188} INFO - Headers: {‘Content-Type’: ‘application/json’}
[2024-06-18, 09:30:36 UTC] {logging_mixin.py:188} INFO - Response Status Code: 404
[2024-06-18, 09:30:36 UTC] {logging_mixin.py:188} INFO - Response Content: b’Object not found.’
Exception: Failed to update source e7066349-ea5b-4ec5-99ee-1a936c462c62: b’Object not found.’

Despite multiple attempts to resolve this issue, including reaching out to the community AI, http://kapa.ai|kapa.ai and checking the Airbyte documentation, the problem persists. Any advice or insights would be greatly appreciated.

Thank you for your assistance.



This topic has been created from a Slack thread to give it more visibility.
It will be on Read-Only mode here. Click here if you want
to access the original thread.

Join the conversation on Slack

["airbyte", "airflow", "integration", "api", "object-not-found", "dag-task", "404-error"]