Using Airflow to orchestrate Airbyte and get job status

Summary

This message is asking how to get the status of an Airbyte job when using Airflow to orchestrate Airbyte. The user wants to perform follow-up logic based on the job status, such as doing transformations if the job passes or sending a notification if the job fails.


Question

Hi

Im trying to use airflow to orchestrate airbyte.
when using the asynchronous parameter, like in the example https://docs.airbyte.com/operator-guides/using-the-airflow-airbyte-operator#using-the-asynchronous-parameter|here, how can I get the status of the airbyte job?

my idea is to do some follow-up logic after the job is done, something like:
if airbyte job pass, do some transformations.
if airbyte job failed, send a notification.

I tried to do something like the below, but that didnt work (it prints None)

from airflow.utils.dates import days_ago
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor


def print_sync_status(**context):
    sync_status = context["task_instance"].xcom_pull(task_ids="airbyte_check_sync")
    print(f"Airbyte sync status: {sync_status}")


with DAG(dag_id='airbyte_trigger_job_example_async',
         default_args={'owner': 'airflow'},
         schedule_interval='@daily',
         start_date=days_ago(1)
    ) as dag:

    async_money_to_json = AirbyteTriggerSyncOperator(
        task_id='airbyte_async_money_json_example',
        airbyte_conn_id='airbyte_conn_example',
        connection_id='1e3b5a72-7bfd-4808-a13c-204505490110',
        asynchronous=True,
    )

    airbyte_sensor = AirbyteJobSensor(
        task_id='airbyte_sensor_money_json_example',
        airbyte_conn_id='airbyte_conn_example',
        airbyte_job_id=async_money_to_json.output
    )

    print_status = PythonOperator(
        task_id="print_sync_status",
        python_callable=print_sync_status,
        provide_context=True,
    )
    
    async_money_to_json >> airbyte_sensor >> print_status```

<br>

---

This topic has been created from a Slack thread to give it more visibility.
It will be on Read-Only mode here. [Click here](https://airbytehq.slack.com/archives/C021JANJ6TY/p1722949552588229) if you want 
to access the original thread.

[Join the conversation on Slack](https://slack.airbyte.com)

<sub>
["airflow", "airbyte", "orchestration", "job status", "follow-up logic", "transformation", "notification"]
</sub>

Did you try to print the all context? Also airbyte_check_sync where are you defining this?

<@U01MMSDJGC9>

yes i did, i dont see anything interesting, which brings me to the conclusion that i need to have a 3rd step that sends a request to airbyte to get the status of the job. i can get the job id using the async_money_to_json.output variable (an int representing the job id i believe).
problem is, i was trying to curl for the job status without success.
was using the docs https://reference.airbyte.com/reference/getjob|here, and also tried to use <@U04SE163WC8> (thread https://airbytehq.slack.com/archives/C01AHCD885S/p1723112947718719|here).

any suggestions?

regarding
> Also airbyte_check_sync where are you defining this?
its my mistake missing it. the DAG i attached is a mock of the real one.
essentially airbyte_check_sync is airbyte_sensor_money_json_example .