Hi,
I have set up Airbyte to sync data from PostgreSQL to BigQuery.
I have noticed that the BigQuery cost is quite high, so I checked the SQL history log on BigQuery and found that Airbyte’s auto-generated BigQuery SQL script is not optimized.
Below is the auto-generated SQL
/* {"app": "dbt", "dbt_version": "1.0.0", "profile_name": "normalize", "target_name": "prod", "node_id": "model.airbyte_utils.lazada_raw_orders_scd"} */
create or replace table `marketplace-reporting`.airbyte.`lazada_raw_orders_scd__dbt_tmp`
partition by range_bucket(
_airbyte_active_row,
generate_array(0, 1, 1)
)
cluster by _airbyte_unique_key_scd, _airbyte_emitted_at
OPTIONS(
expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
)
as (
-- depends_on: ref('lazada_raw_orders_stg')
with
new_data as (
-- retrieve incremental "new" data
select
*
from `marketplace-reporting`._airbyte_airbyte.`lazada_raw_orders_stg`
-- lazada_raw_orders from `marketplace-reporting`.airbyte._airbyte_raw_lazada_raw_orders
where 1 = 1
and coalesce(
cast(_airbyte_emitted_at as
timestamp
) >= (select max(cast(_airbyte_emitted_at as
timestamp
)) from `marketplace-reporting`.airbyte.`lazada_raw_orders_scd`),
true)
),
new_data_ids as (
-- build a subset of _airbyte_unique_key from rows that are new
select distinct
to_hex(md5(cast(concat(coalesce(cast(id as
string
), '')) as
string
))) as _airbyte_unique_key
from new_data
),
empty_new_data as (
-- build an empty table to only keep the table's column types
select * from new_data where 1 = 0
),
previous_active_scd_data as (
-- retrieve "incomplete old" data that needs to be updated with an end date because of new changes
select
this_data.`_airbyte_lazada_raw_orders_hashid`,
this_data.`id`,
this_data.`p_uid`,
this_data.`editor`,
this_data.`reason`,
this_data.`status`,
this_data.`premium`,
this_data.`brand_id`,
this_data.`currency`,
this_data.`tax_code`,
this_data.`bundle_id`,
this_data.`item_name`,
this_data.`lazada_id`,
this_data.`variation`,
this_data.`created_at`,
this_data.`lazada_sku`,
this_data.`order_flag`,
this_data.`order_type`,
this_data.`paid_price`,
this_data.`seller_sku`,
this_data.`tax_amount`,
this_data.`unit_price`,
this_data.`updated_at`,
this_data.`billing_city`,
this_data.`billing_name`,
this_data.`crea_country`,
this_data.`order_number`,
this_data.`payment_time`,
this_data.`shipping_fee`,
this_data.`tracking_url`,
this_data.`voucher_code`,
this_data.`branch_number`,
this_data.`crea_currency`,
this_data.`customer_name`,
this_data.`delivery_time`,
this_data.`order_item_id`,
this_data.`reason_detail`,
this_data.`refund_amount`,
this_data.`return_status`,
this_data.`shipping_city`,
this_data.`shipping_name`,
this_data.`tracking_code`,
this_data.`crea_status_id`,
this_data.`customer_email`,
this_data.`invoice_number`,
this_data.`payment_method`,
this_data.`sla_time_stamp`,
this_data.`voucher_amount`,
this_data.`voucher_seller`,
this_data.`wallet_credits`,
this_data.`billing_address`,
this_data.`billing_country`,
this_data.`bundle_discount`,
this_data.`is_synced_to_bq`,
this_data.`sent_to_cprv_at`,
this_data.`shipping_amount`,
this_data.`shipping_region`,
this_data.`billing_address2`,
this_data.`billing_address3`,
this_data.`billing_address4`,
this_data.`billing_address5`,
this_data.`billing_postcode`,
this_data.`cd_tracking_code`,
this_data.`invoice_required`,
this_data.`p_uid_updated_at`,
this_data.`quantity_ordered`,
this_data.`reversal__is_rtm`,
this_data.`shipping_address`,
this_data.`shipping_country`,
this_data.`voucher_platform`,
this_data.`lazada_created_at`,
this_data.`lazada_updated_at`,
this_data.`purchase_order_id`,
this_data.`shipping_address2`,
this_data.`shipping_address3`,
this_data.`shipping_address4`,
this_data.`shipping_address5`,
this_data.`shipping_postcode`,
this_data.`shipping_provider`,
this_data.`shipment_type_name`,
this_data.`billing_phone_number`,
this_data.`cd_shipping_provider`,
this_data.`reversal__ofc_status`,
this_data.`billing_phone_number2`,
this_data.`purchase_order_number`,
this_data.`shipping_fee_original`,
this_data.`shipping_phone_number`,
this_data.`shipping_service_cost`,
this_data.`tax_invoice_requested`,
this_data.`promised_shipping_time`,
this_data.`reversal__request_type`,
this_data.`shipping_phone_number2`,
this_data.`shipping_provider_type`,
this_data.`cancel_return_initiator`,
this_data.`is_tax_invoice_exported`,
this_data.`reversal__shipping_type`,
this_data.`tracking_url_first_mile`,
this_data.`reversal__is_need_refund`,
this_data.`reversal__reverse_status`,
this_data.`tracking_code_first_mile`,
this_data.`derived_order_time_failed`,
this_data.`warehouse_packed_datetime`,
this_data.`derived_order_time_shipped`,
this_data.`reversal__reverse_order_id`,
this_data.`warehouse_shipped_datetime`,
this_data.`crea_send_shipped_to_client`,
this_data.`derived_order_time_refunded`,
this_data.`derived_order_time_returned`,
this_data.`warehouse_received_datetime`,
this_data.`crea_send_returned_to_client`,
this_data.`derived_order_time_cancelled`,
this_data.`derived_order_time_delivered`,
this_data.`national_registration_number`,
this_data.`shipping_fee_discount_seller`,
this_data.`shipping_provider_first_mile`,
this_data.`crea_warehouse_order_status_id`,
this_data.`shipping_fee_discount_platform`,
this_data.`reversal__reverse_order_line_id`,
this_data.`derived_order_time_ready_to_ship`,
this_data.`_airbyte_ab_id`,
this_data.`_airbyte_emitted_at`,
this_data.`_airbyte_normalized_at`
from `marketplace-reporting`.airbyte.`lazada_raw_orders_scd` as this_data
-- make a join with new_data using primary key to filter active data that need to be updated only
join new_data_ids on this_data._airbyte_unique_key = new_data_ids._airbyte_unique_key
-- force left join to NULL values (we just need to transfer column types only for the star_intersect macro on schema changes)
left join empty_new_data as inc_data on this_data._airbyte_ab_id = inc_data._airbyte_ab_id
where _airbyte_active_row = 1
),
input_data as (
select `_airbyte_lazada_raw_orders_hashid`,
`id`,
`p_uid`,
`editor`,
`reason`,
`status`,
`premium`,
`brand_id`,
`currency`,
`tax_code`,
`bundle_id`,
`item_name`,
`lazada_id`,
`variation`,
`created_at`,
`lazada_sku`,
`order_flag`,
`order_type`,
`paid_price`,
`seller_sku`,
`tax_amount`,
`unit_price`,
`updated_at`,
`billing_city`,
`billing_name`,
`crea_country`,
`order_number`,
`payment_time`,
`shipping_fee`,
`tracking_url`,
`voucher_code`,
`branch_number`,
`crea_currency`,
`customer_name`,
`delivery_time`,
`order_item_id`,
`reason_detail`,
`refund_amount`,
`return_status`,
`shipping_city`,
`shipping_name`,
`tracking_code`,
`crea_status_id`,
`customer_email`,
`invoice_number`,
`payment_method`,
`sla_time_stamp`,
`voucher_amount`,
`voucher_seller`,
`wallet_credits`,
`billing_address`,
`billing_country`,
`bundle_discount`,
`is_synced_to_bq`,
`sent_to_cprv_at`,
`shipping_amount`,
`shipping_region`,
`billing_address2`,
`billing_address3`,
`billing_address4`,
`billing_address5`,
`billing_postcode`,
`cd_tracking_code`,
`invoice_required`,
`p_uid_updated_at`,
`quantity_ordered`,
`reversal__is_rtm`,
`shipping_address`,
`shipping_country`,
`voucher_platform`,
`lazada_created_at`,
`lazada_updated_at`,
`purchase_order_id`,
`shipping_address2`,
`shipping_address3`,
`shipping_address4`,
`shipping_address5`,
`shipping_postcode`,
`shipping_provider`,
`shipment_type_name`,
`billing_phone_number`,
`cd_shipping_provider`,
`reversal__ofc_status`,
`billing_phone_number2`,
`purchase_order_number`,
`shipping_fee_original`,
`shipping_phone_number`,
`shipping_service_cost`,
`tax_invoice_requested`,
`promised_shipping_time`,
`reversal__request_type`,
`shipping_phone_number2`,
`shipping_provider_type`,
`cancel_return_initiator`,
`is_tax_invoice_exported`,
`reversal__shipping_type`,
`tracking_url_first_mile`,
`reversal__is_need_refund`,
`reversal__reverse_status`,
`tracking_code_first_mile`,
`derived_order_time_failed`,
`warehouse_packed_datetime`,
`derived_order_time_shipped`,
`reversal__reverse_order_id`,
`warehouse_shipped_datetime`,
`crea_send_shipped_to_client`,
`derived_order_time_refunded`,
`derived_order_time_returned`,
`warehouse_received_datetime`,
`crea_send_returned_to_client`,
`derived_order_time_cancelled`,
`derived_order_time_delivered`,
`national_registration_number`,
`shipping_fee_discount_seller`,
`shipping_provider_first_mile`,
`crea_warehouse_order_status_id`,
`shipping_fee_discount_platform`,
`reversal__reverse_order_line_id`,
`derived_order_time_ready_to_ship`,
`_airbyte_ab_id`,
`_airbyte_emitted_at`,
`_airbyte_normalized_at` from new_data
union all
select `_airbyte_lazada_raw_orders_hashid`,
`id`,
`p_uid`,
`editor`,
`reason`,
`status`,
`premium`,
`brand_id`,
`currency`,
`tax_code`,
`bundle_id`,
`item_name`,
`lazada_id`,
`variation`,
`created_at`,
`lazada_sku`,
`order_flag`,
`order_type`,
`paid_price`,
`seller_sku`,
`tax_amount`,
`unit_price`,
`updated_at`,
`billing_city`,
`billing_name`,
`crea_country`,
`order_number`,
`payment_time`,
`shipping_fee`,
`tracking_url`,
`voucher_code`,
`branch_number`,
`crea_currency`,
`customer_name`,
`delivery_time`,
`order_item_id`,
`reason_detail`,
`refund_amount`,
`return_status`,
`shipping_city`,
`shipping_name`,
`tracking_code`,
`crea_status_id`,
`customer_email`,
`invoice_number`,
`payment_method`,
`sla_time_stamp`,
`voucher_amount`,
`voucher_seller`,
`wallet_credits`,
`billing_address`,
`billing_country`,
`bundle_discount`,
`is_synced_to_bq`,
`sent_to_cprv_at`,
`shipping_amount`,
`shipping_region`,
`billing_address2`,
`billing_address3`,
`billing_address4`,
`billing_address5`,
`billing_postcode`,
`cd_tracking_code`,
`invoice_required`,
`p_uid_updated_at`,
`quantity_ordered`,
`reversal__is_rtm`,
`shipping_address`,
`shipping_country`,
`voucher_platform`,
`lazada_created_at`,
`lazada_updated_at`,
`purchase_order_id`,
`shipping_address2`,
`shipping_address3`,
`shipping_address4`,
`shipping_address5`,
`shipping_postcode`,
`shipping_provider`,
`shipment_type_name`,
`billing_phone_number`,
`cd_shipping_provider`,
`reversal__ofc_status`,
`billing_phone_number2`,
`purchase_order_number`,
`shipping_fee_original`,
`shipping_phone_number`,
`shipping_service_cost`,
`tax_invoice_requested`,
`promised_shipping_time`,
`reversal__request_type`,
`shipping_phone_number2`,
`shipping_provider_type`,
`cancel_return_initiator`,
`is_tax_invoice_exported`,
`reversal__shipping_type`,
`tracking_url_first_mile`,
`reversal__is_need_refund`,
`reversal__reverse_status`,
`tracking_code_first_mile`,
`derived_order_time_failed`,
`warehouse_packed_datetime`,
`derived_order_time_shipped`,
`reversal__reverse_order_id`,
`warehouse_shipped_datetime`,
`crea_send_shipped_to_client`,
`derived_order_time_refunded`,
`derived_order_time_returned`,
`warehouse_received_datetime`,
`crea_send_returned_to_client`,
`derived_order_time_cancelled`,
`derived_order_time_delivered`,
`national_registration_number`,
`shipping_fee_discount_seller`,
`shipping_provider_first_mile`,
`crea_warehouse_order_status_id`,
`shipping_fee_discount_platform`,
`reversal__reverse_order_line_id`,
`derived_order_time_ready_to_ship`,
`_airbyte_ab_id`,
`_airbyte_emitted_at`,
`_airbyte_normalized_at` from previous_active_scd_data
),
scd_data as (
-- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key
select
to_hex(md5(cast(concat(coalesce(cast(id as
string
), '')) as
string
))) as _airbyte_unique_key,
id,
p_uid,
editor,
reason,
status,
premium,
brand_id,
currency,
tax_code,
bundle_id,
item_name,
lazada_id,
variation,
created_at,
lazada_sku,
order_flag,
order_type,
paid_price,
seller_sku,
tax_amount,
unit_price,
updated_at,
billing_city,
billing_name,
crea_country,
order_number,
payment_time,
shipping_fee,
tracking_url,
voucher_code,
branch_number,
crea_currency,
customer_name,
delivery_time,
order_item_id,
reason_detail,
refund_amount,
return_status,
shipping_city,
shipping_name,
tracking_code,
crea_status_id,
customer_email,
invoice_number,
payment_method,
sla_time_stamp,
voucher_amount,
voucher_seller,
wallet_credits,
billing_address,
billing_country,
bundle_discount,
is_synced_to_bq,
sent_to_cprv_at,
shipping_amount,
shipping_region,
billing_address2,
billing_address3,
billing_address4,
billing_address5,
billing_postcode,
cd_tracking_code,
invoice_required,
p_uid_updated_at,
quantity_ordered,
reversal__is_rtm,
shipping_address,
shipping_country,
voucher_platform,
lazada_created_at,
lazada_updated_at,
purchase_order_id,
shipping_address2,
shipping_address3,
shipping_address4,
shipping_address5,
shipping_postcode,
shipping_provider,
shipment_type_name,
billing_phone_number,
cd_shipping_provider,
reversal__ofc_status,
billing_phone_number2,
purchase_order_number,
shipping_fee_original,
shipping_phone_number,
shipping_service_cost,
tax_invoice_requested,
promised_shipping_time,
reversal__request_type,
shipping_phone_number2,
shipping_provider_type,
cancel_return_initiator,
is_tax_invoice_exported,
reversal__shipping_type,
tracking_url_first_mile,
reversal__is_need_refund,
reversal__reverse_status,
tracking_code_first_mile,
derived_order_time_failed,
warehouse_packed_datetime,
derived_order_time_shipped,
reversal__reverse_order_id,
warehouse_shipped_datetime,
crea_send_shipped_to_client,
derived_order_time_refunded,
derived_order_time_returned,
warehouse_received_datetime,
crea_send_returned_to_client,
derived_order_time_cancelled,
derived_order_time_delivered,
national_registration_number,
shipping_fee_discount_seller,
shipping_provider_first_mile,
crea_warehouse_order_status_id,
shipping_fee_discount_platform,
reversal__reverse_order_line_id,
derived_order_time_ready_to_ship,
updated_at as _airbyte_start_at,
lag(updated_at) over (
partition by cast(id as
string
)
order by
updated_at is null asc,
updated_at desc,
_airbyte_emitted_at desc
) as _airbyte_end_at,
case when row_number() over (
partition by cast(id as
string
)
order by
updated_at is null asc,
updated_at desc,
_airbyte_emitted_at desc
) = 1 then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_lazada_raw_orders_hashid
from input_data
),
dedup_data as (
select
-- we need to ensure de-duplicated rows for merge/update queries
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by
_airbyte_unique_key,
_airbyte_start_at,
_airbyte_emitted_at
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
to_hex(md5(cast(concat(coalesce(cast(_airbyte_unique_key as
string
), ''), '-', coalesce(cast(_airbyte_start_at as
string
), ''), '-', coalesce(cast(_airbyte_emitted_at as
string
), '')) as
string
))) as _airbyte_unique_key_scd,
scd_data.*
from scd_data
)
select
_airbyte_unique_key,
_airbyte_unique_key_scd,
id,
p_uid,
editor,
reason,
status,
premium,
brand_id,
currency,
tax_code,
bundle_id,
item_name,
lazada_id,
variation,
created_at,
lazada_sku,
order_flag,
order_type,
paid_price,
seller_sku,
tax_amount,
unit_price,
updated_at,
billing_city,
billing_name,
crea_country,
order_number,
payment_time,
shipping_fee,
tracking_url,
voucher_code,
branch_number,
crea_currency,
customer_name,
delivery_time,
order_item_id,
reason_detail,
refund_amount,
return_status,
shipping_city,
shipping_name,
tracking_code,
crea_status_id,
customer_email,
invoice_number,
payment_method,
sla_time_stamp,
voucher_amount,
voucher_seller,
wallet_credits,
billing_address,
billing_country,
bundle_discount,
is_synced_to_bq,
sent_to_cprv_at,
shipping_amount,
shipping_region,
billing_address2,
billing_address3,
billing_address4,
billing_address5,
billing_postcode,
cd_tracking_code,
invoice_required,
p_uid_updated_at,
quantity_ordered,
reversal__is_rtm,
shipping_address,
shipping_country,
voucher_platform,
lazada_created_at,
lazada_updated_at,
purchase_order_id,
shipping_address2,
shipping_address3,
shipping_address4,
shipping_address5,
shipping_postcode,
shipping_provider,
shipment_type_name,
billing_phone_number,
cd_shipping_provider,
reversal__ofc_status,
billing_phone_number2,
purchase_order_number,
shipping_fee_original,
shipping_phone_number,
shipping_service_cost,
tax_invoice_requested,
promised_shipping_time,
reversal__request_type,
shipping_phone_number2,
shipping_provider_type,
cancel_return_initiator,
is_tax_invoice_exported,
reversal__shipping_type,
tracking_url_first_mile,
reversal__is_need_refund,
reversal__reverse_status,
tracking_code_first_mile,
derived_order_time_failed,
warehouse_packed_datetime,
derived_order_time_shipped,
reversal__reverse_order_id,
warehouse_shipped_datetime,
crea_send_shipped_to_client,
derived_order_time_refunded,
derived_order_time_returned,
warehouse_received_datetime,
crea_send_returned_to_client,
derived_order_time_cancelled,
derived_order_time_delivered,
national_registration_number,
shipping_fee_discount_seller,
shipping_provider_first_mile,
crea_warehouse_order_status_id,
shipping_fee_discount_platform,
reversal__reverse_order_line_id,
derived_order_time_ready_to_ship,
_airbyte_start_at,
_airbyte_end_at,
_airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP() as _airbyte_normalized_at,
_airbyte_lazada_raw_orders_hashid
from dedup_data where _airbyte_row_num = 1
);
Note that this part where it selects max _airbyte_emitted_at is a known BQ issue that you need to assign it to a variable instead; otherwise the BQ SQL optimizer will still do full table scan.
where 1 = 1
and coalesce(
cast(_airbyte_emitted_at as
timestamp
)>= (select max(cast(_airbyte_emitted_at as
timestamp
)) from `marketplace-reporting`.airbyte.`lazada_raw_orders_scd`)
How do I edit this query? Do I export it from Docker image, edit the query, then put the file back where I export it from in the Docker image?
Airbyte version: 0.38.3-alpha