Summary
This message is asking how to get the status of an Airbyte job when using Airflow to orchestrate Airbyte. The user wants to perform follow-up logic based on the job status, such as doing transformations if the job passes or sending a notification if the job fails.
Question
Hi
Im trying to use airflow to orchestrate airbyte.
when using the asynchronous
parameter, like in the example https://docs.airbyte.com/operator-guides/using-the-airflow-airbyte-operator#using-the-asynchronous-parameter|here, how can I get the status of the airbyte job?
my idea is to do some follow-up logic after the job is done, something like:
if airbyte job pass, do some transformations.
if airbyte job failed, send a notification.
I tried to do something like the below, but that didnt work (it prints None
)
from airflow.utils.dates import days_ago
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
def print_sync_status(**context):
sync_status = context["task_instance"].xcom_pull(task_ids="airbyte_check_sync")
print(f"Airbyte sync status: {sync_status}")
with DAG(dag_id='airbyte_trigger_job_example_async',
default_args={'owner': 'airflow'},
schedule_interval='@daily',
start_date=days_ago(1)
) as dag:
async_money_to_json = AirbyteTriggerSyncOperator(
task_id='airbyte_async_money_json_example',
airbyte_conn_id='airbyte_conn_example',
connection_id='1e3b5a72-7bfd-4808-a13c-204505490110',
asynchronous=True,
)
airbyte_sensor = AirbyteJobSensor(
task_id='airbyte_sensor_money_json_example',
airbyte_conn_id='airbyte_conn_example',
airbyte_job_id=async_money_to_json.output
)
print_status = PythonOperator(
task_id="print_sync_status",
python_callable=print_sync_status,
provide_context=True,
)
async_money_to_json >> airbyte_sensor >> print_status```
<br>
---
This topic has been created from a Slack thread to give it more visibility.
It will be on Read-Only mode here. [Click here](https://airbytehq.slack.com/archives/C021JANJ6TY/p1722949552588229) if you want
to access the original thread.
[Join the conversation on Slack](https://slack.airbyte.com)
<sub>
["airflow", "airbyte", "orchestration", "job status", "follow-up logic", "transformation", "notification"]
</sub>