BigQuery SQL optimization

Hi,

I have set up Airbyte to sync data from PostgreSQL to BigQuery.
I have noticed that the BigQuery cost is quite high, so I checked the SQL history log on BigQuery and found that Airbyte’s auto-generated BigQuery SQL script is not optimized.

Below is the auto-generated SQL

/* {"app": "dbt", "dbt_version": "1.0.0", "profile_name": "normalize", "target_name": "prod", "node_id": "model.airbyte_utils.lazada_raw_orders_scd"} */

  create or replace table `marketplace-reporting`.airbyte.`lazada_raw_orders_scd__dbt_tmp`
  partition by range_bucket(
            _airbyte_active_row,
            generate_array(0, 1, 1)
        )
  cluster by _airbyte_unique_key_scd, _airbyte_emitted_at
  OPTIONS(
      expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
    )
  as (
    
-- depends_on: ref('lazada_raw_orders_stg')
with

new_data as (
    -- retrieve incremental "new" data
    select
        *
    from `marketplace-reporting`._airbyte_airbyte.`lazada_raw_orders_stg`
    -- lazada_raw_orders from `marketplace-reporting`.airbyte._airbyte_raw_lazada_raw_orders
    where 1 = 1
    
and coalesce(
    cast(_airbyte_emitted_at as 
    timestamp
) >= (select max(cast(_airbyte_emitted_at as 
    timestamp
)) from `marketplace-reporting`.airbyte.`lazada_raw_orders_scd`),
    
    
    true)

),
new_data_ids as (
    -- build a subset of _airbyte_unique_key from rows that are new
    select distinct
        to_hex(md5(cast(concat(coalesce(cast(id as 
    string
), '')) as 
    string
))) as _airbyte_unique_key
    from new_data
),
empty_new_data as (
    -- build an empty table to only keep the table's column types
    select * from new_data where 1 = 0
),
previous_active_scd_data as (
    -- retrieve "incomplete old" data that needs to be updated with an end date because of new changes
    select
        this_data.`_airbyte_lazada_raw_orders_hashid`,
  this_data.`id`,
  this_data.`p_uid`,
  this_data.`editor`,
  this_data.`reason`,
  this_data.`status`,
  this_data.`premium`,
  this_data.`brand_id`,
  this_data.`currency`,
  this_data.`tax_code`,
  this_data.`bundle_id`,
  this_data.`item_name`,
  this_data.`lazada_id`,
  this_data.`variation`,
  this_data.`created_at`,
  this_data.`lazada_sku`,
  this_data.`order_flag`,
  this_data.`order_type`,
  this_data.`paid_price`,
  this_data.`seller_sku`,
  this_data.`tax_amount`,
  this_data.`unit_price`,
  this_data.`updated_at`,
  this_data.`billing_city`,
  this_data.`billing_name`,
  this_data.`crea_country`,
  this_data.`order_number`,
  this_data.`payment_time`,
  this_data.`shipping_fee`,
  this_data.`tracking_url`,
  this_data.`voucher_code`,
  this_data.`branch_number`,
  this_data.`crea_currency`,
  this_data.`customer_name`,
  this_data.`delivery_time`,
  this_data.`order_item_id`,
  this_data.`reason_detail`,
  this_data.`refund_amount`,
  this_data.`return_status`,
  this_data.`shipping_city`,
  this_data.`shipping_name`,
  this_data.`tracking_code`,
  this_data.`crea_status_id`,
  this_data.`customer_email`,
  this_data.`invoice_number`,
  this_data.`payment_method`,
  this_data.`sla_time_stamp`,
  this_data.`voucher_amount`,
  this_data.`voucher_seller`,
  this_data.`wallet_credits`,
  this_data.`billing_address`,
  this_data.`billing_country`,
  this_data.`bundle_discount`,
  this_data.`is_synced_to_bq`,
  this_data.`sent_to_cprv_at`,
  this_data.`shipping_amount`,
  this_data.`shipping_region`,
  this_data.`billing_address2`,
  this_data.`billing_address3`,
  this_data.`billing_address4`,
  this_data.`billing_address5`,
  this_data.`billing_postcode`,
  this_data.`cd_tracking_code`,
  this_data.`invoice_required`,
  this_data.`p_uid_updated_at`,
  this_data.`quantity_ordered`,
  this_data.`reversal__is_rtm`,
  this_data.`shipping_address`,
  this_data.`shipping_country`,
  this_data.`voucher_platform`,
  this_data.`lazada_created_at`,
  this_data.`lazada_updated_at`,
  this_data.`purchase_order_id`,
  this_data.`shipping_address2`,
  this_data.`shipping_address3`,
  this_data.`shipping_address4`,
  this_data.`shipping_address5`,
  this_data.`shipping_postcode`,
  this_data.`shipping_provider`,
  this_data.`shipment_type_name`,
  this_data.`billing_phone_number`,
  this_data.`cd_shipping_provider`,
  this_data.`reversal__ofc_status`,
  this_data.`billing_phone_number2`,
  this_data.`purchase_order_number`,
  this_data.`shipping_fee_original`,
  this_data.`shipping_phone_number`,
  this_data.`shipping_service_cost`,
  this_data.`tax_invoice_requested`,
  this_data.`promised_shipping_time`,
  this_data.`reversal__request_type`,
  this_data.`shipping_phone_number2`,
  this_data.`shipping_provider_type`,
  this_data.`cancel_return_initiator`,
  this_data.`is_tax_invoice_exported`,
  this_data.`reversal__shipping_type`,
  this_data.`tracking_url_first_mile`,
  this_data.`reversal__is_need_refund`,
  this_data.`reversal__reverse_status`,
  this_data.`tracking_code_first_mile`,
  this_data.`derived_order_time_failed`,
  this_data.`warehouse_packed_datetime`,
  this_data.`derived_order_time_shipped`,
  this_data.`reversal__reverse_order_id`,
  this_data.`warehouse_shipped_datetime`,
  this_data.`crea_send_shipped_to_client`,
  this_data.`derived_order_time_refunded`,
  this_data.`derived_order_time_returned`,
  this_data.`warehouse_received_datetime`,
  this_data.`crea_send_returned_to_client`,
  this_data.`derived_order_time_cancelled`,
  this_data.`derived_order_time_delivered`,
  this_data.`national_registration_number`,
  this_data.`shipping_fee_discount_seller`,
  this_data.`shipping_provider_first_mile`,
  this_data.`crea_warehouse_order_status_id`,
  this_data.`shipping_fee_discount_platform`,
  this_data.`reversal__reverse_order_line_id`,
  this_data.`derived_order_time_ready_to_ship`,
  this_data.`_airbyte_ab_id`,
  this_data.`_airbyte_emitted_at`,
  this_data.`_airbyte_normalized_at`
    from `marketplace-reporting`.airbyte.`lazada_raw_orders_scd` as this_data
    -- make a join with new_data using primary key to filter active data that need to be updated only
    join new_data_ids on this_data._airbyte_unique_key = new_data_ids._airbyte_unique_key
    -- force left join to NULL values (we just need to transfer column types only for the star_intersect macro on schema changes)
    left join empty_new_data as inc_data on this_data._airbyte_ab_id = inc_data._airbyte_ab_id
    where _airbyte_active_row = 1
),
input_data as (
    select `_airbyte_lazada_raw_orders_hashid`,
  `id`,
  `p_uid`,
  `editor`,
  `reason`,
  `status`,
  `premium`,
  `brand_id`,
  `currency`,
  `tax_code`,
  `bundle_id`,
  `item_name`,
  `lazada_id`,
  `variation`,
  `created_at`,
  `lazada_sku`,
  `order_flag`,
  `order_type`,
  `paid_price`,
  `seller_sku`,
  `tax_amount`,
  `unit_price`,
  `updated_at`,
  `billing_city`,
  `billing_name`,
  `crea_country`,
  `order_number`,
  `payment_time`,
  `shipping_fee`,
  `tracking_url`,
  `voucher_code`,
  `branch_number`,
  `crea_currency`,
  `customer_name`,
  `delivery_time`,
  `order_item_id`,
  `reason_detail`,
  `refund_amount`,
  `return_status`,
  `shipping_city`,
  `shipping_name`,
  `tracking_code`,
  `crea_status_id`,
  `customer_email`,
  `invoice_number`,
  `payment_method`,
  `sla_time_stamp`,
  `voucher_amount`,
  `voucher_seller`,
  `wallet_credits`,
  `billing_address`,
  `billing_country`,
  `bundle_discount`,
  `is_synced_to_bq`,
  `sent_to_cprv_at`,
  `shipping_amount`,
  `shipping_region`,
  `billing_address2`,
  `billing_address3`,
  `billing_address4`,
  `billing_address5`,
  `billing_postcode`,
  `cd_tracking_code`,
  `invoice_required`,
  `p_uid_updated_at`,
  `quantity_ordered`,
  `reversal__is_rtm`,
  `shipping_address`,
  `shipping_country`,
  `voucher_platform`,
  `lazada_created_at`,
  `lazada_updated_at`,
  `purchase_order_id`,
  `shipping_address2`,
  `shipping_address3`,
  `shipping_address4`,
  `shipping_address5`,
  `shipping_postcode`,
  `shipping_provider`,
  `shipment_type_name`,
  `billing_phone_number`,
  `cd_shipping_provider`,
  `reversal__ofc_status`,
  `billing_phone_number2`,
  `purchase_order_number`,
  `shipping_fee_original`,
  `shipping_phone_number`,
  `shipping_service_cost`,
  `tax_invoice_requested`,
  `promised_shipping_time`,
  `reversal__request_type`,
  `shipping_phone_number2`,
  `shipping_provider_type`,
  `cancel_return_initiator`,
  `is_tax_invoice_exported`,
  `reversal__shipping_type`,
  `tracking_url_first_mile`,
  `reversal__is_need_refund`,
  `reversal__reverse_status`,
  `tracking_code_first_mile`,
  `derived_order_time_failed`,
  `warehouse_packed_datetime`,
  `derived_order_time_shipped`,
  `reversal__reverse_order_id`,
  `warehouse_shipped_datetime`,
  `crea_send_shipped_to_client`,
  `derived_order_time_refunded`,
  `derived_order_time_returned`,
  `warehouse_received_datetime`,
  `crea_send_returned_to_client`,
  `derived_order_time_cancelled`,
  `derived_order_time_delivered`,
  `national_registration_number`,
  `shipping_fee_discount_seller`,
  `shipping_provider_first_mile`,
  `crea_warehouse_order_status_id`,
  `shipping_fee_discount_platform`,
  `reversal__reverse_order_line_id`,
  `derived_order_time_ready_to_ship`,
  `_airbyte_ab_id`,
  `_airbyte_emitted_at`,
  `_airbyte_normalized_at` from new_data
    union all
    select `_airbyte_lazada_raw_orders_hashid`,
  `id`,
  `p_uid`,
  `editor`,
  `reason`,
  `status`,
  `premium`,
  `brand_id`,
  `currency`,
  `tax_code`,
  `bundle_id`,
  `item_name`,
  `lazada_id`,
  `variation`,
  `created_at`,
  `lazada_sku`,
  `order_flag`,
  `order_type`,
  `paid_price`,
  `seller_sku`,
  `tax_amount`,
  `unit_price`,
  `updated_at`,
  `billing_city`,
  `billing_name`,
  `crea_country`,
  `order_number`,
  `payment_time`,
  `shipping_fee`,
  `tracking_url`,
  `voucher_code`,
  `branch_number`,
  `crea_currency`,
  `customer_name`,
  `delivery_time`,
  `order_item_id`,
  `reason_detail`,
  `refund_amount`,
  `return_status`,
  `shipping_city`,
  `shipping_name`,
  `tracking_code`,
  `crea_status_id`,
  `customer_email`,
  `invoice_number`,
  `payment_method`,
  `sla_time_stamp`,
  `voucher_amount`,
  `voucher_seller`,
  `wallet_credits`,
  `billing_address`,
  `billing_country`,
  `bundle_discount`,
  `is_synced_to_bq`,
  `sent_to_cprv_at`,
  `shipping_amount`,
  `shipping_region`,
  `billing_address2`,
  `billing_address3`,
  `billing_address4`,
  `billing_address5`,
  `billing_postcode`,
  `cd_tracking_code`,
  `invoice_required`,
  `p_uid_updated_at`,
  `quantity_ordered`,
  `reversal__is_rtm`,
  `shipping_address`,
  `shipping_country`,
  `voucher_platform`,
  `lazada_created_at`,
  `lazada_updated_at`,
  `purchase_order_id`,
  `shipping_address2`,
  `shipping_address3`,
  `shipping_address4`,
  `shipping_address5`,
  `shipping_postcode`,
  `shipping_provider`,
  `shipment_type_name`,
  `billing_phone_number`,
  `cd_shipping_provider`,
  `reversal__ofc_status`,
  `billing_phone_number2`,
  `purchase_order_number`,
  `shipping_fee_original`,
  `shipping_phone_number`,
  `shipping_service_cost`,
  `tax_invoice_requested`,
  `promised_shipping_time`,
  `reversal__request_type`,
  `shipping_phone_number2`,
  `shipping_provider_type`,
  `cancel_return_initiator`,
  `is_tax_invoice_exported`,
  `reversal__shipping_type`,
  `tracking_url_first_mile`,
  `reversal__is_need_refund`,
  `reversal__reverse_status`,
  `tracking_code_first_mile`,
  `derived_order_time_failed`,
  `warehouse_packed_datetime`,
  `derived_order_time_shipped`,
  `reversal__reverse_order_id`,
  `warehouse_shipped_datetime`,
  `crea_send_shipped_to_client`,
  `derived_order_time_refunded`,
  `derived_order_time_returned`,
  `warehouse_received_datetime`,
  `crea_send_returned_to_client`,
  `derived_order_time_cancelled`,
  `derived_order_time_delivered`,
  `national_registration_number`,
  `shipping_fee_discount_seller`,
  `shipping_provider_first_mile`,
  `crea_warehouse_order_status_id`,
  `shipping_fee_discount_platform`,
  `reversal__reverse_order_line_id`,
  `derived_order_time_ready_to_ship`,
  `_airbyte_ab_id`,
  `_airbyte_emitted_at`,
  `_airbyte_normalized_at` from previous_active_scd_data
),

scd_data as (
    -- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key
    select
      to_hex(md5(cast(concat(coalesce(cast(id as 
    string
), '')) as 
    string
))) as _airbyte_unique_key,
      id,
      p_uid,
      editor,
      reason,
      status,
      premium,
      brand_id,
      currency,
      tax_code,
      bundle_id,
      item_name,
      lazada_id,
      variation,
      created_at,
      lazada_sku,
      order_flag,
      order_type,
      paid_price,
      seller_sku,
      tax_amount,
      unit_price,
      updated_at,
      billing_city,
      billing_name,
      crea_country,
      order_number,
      payment_time,
      shipping_fee,
      tracking_url,
      voucher_code,
      branch_number,
      crea_currency,
      customer_name,
      delivery_time,
      order_item_id,
      reason_detail,
      refund_amount,
      return_status,
      shipping_city,
      shipping_name,
      tracking_code,
      crea_status_id,
      customer_email,
      invoice_number,
      payment_method,
      sla_time_stamp,
      voucher_amount,
      voucher_seller,
      wallet_credits,
      billing_address,
      billing_country,
      bundle_discount,
      is_synced_to_bq,
      sent_to_cprv_at,
      shipping_amount,
      shipping_region,
      billing_address2,
      billing_address3,
      billing_address4,
      billing_address5,
      billing_postcode,
      cd_tracking_code,
      invoice_required,
      p_uid_updated_at,
      quantity_ordered,
      reversal__is_rtm,
      shipping_address,
      shipping_country,
      voucher_platform,
      lazada_created_at,
      lazada_updated_at,
      purchase_order_id,
      shipping_address2,
      shipping_address3,
      shipping_address4,
      shipping_address5,
      shipping_postcode,
      shipping_provider,
      shipment_type_name,
      billing_phone_number,
      cd_shipping_provider,
      reversal__ofc_status,
      billing_phone_number2,
      purchase_order_number,
      shipping_fee_original,
      shipping_phone_number,
      shipping_service_cost,
      tax_invoice_requested,
      promised_shipping_time,
      reversal__request_type,
      shipping_phone_number2,
      shipping_provider_type,
      cancel_return_initiator,
      is_tax_invoice_exported,
      reversal__shipping_type,
      tracking_url_first_mile,
      reversal__is_need_refund,
      reversal__reverse_status,
      tracking_code_first_mile,
      derived_order_time_failed,
      warehouse_packed_datetime,
      derived_order_time_shipped,
      reversal__reverse_order_id,
      warehouse_shipped_datetime,
      crea_send_shipped_to_client,
      derived_order_time_refunded,
      derived_order_time_returned,
      warehouse_received_datetime,
      crea_send_returned_to_client,
      derived_order_time_cancelled,
      derived_order_time_delivered,
      national_registration_number,
      shipping_fee_discount_seller,
      shipping_provider_first_mile,
      crea_warehouse_order_status_id,
      shipping_fee_discount_platform,
      reversal__reverse_order_line_id,
      derived_order_time_ready_to_ship,
      updated_at as _airbyte_start_at,
      lag(updated_at) over (
        partition by cast(id as 
    string
)
        order by
            updated_at is null asc,
            updated_at desc,
            _airbyte_emitted_at desc
      ) as _airbyte_end_at,
      case when row_number() over (
        partition by cast(id as 
    string
)
        order by
            updated_at is null asc,
            updated_at desc,
            _airbyte_emitted_at desc
      ) = 1 then 1 else 0 end as _airbyte_active_row,
      _airbyte_ab_id,
      _airbyte_emitted_at,
      _airbyte_lazada_raw_orders_hashid
    from input_data
),
dedup_data as (
    select
        -- we need to ensure de-duplicated rows for merge/update queries
        -- additionally, we generate a unique key for the scd table
        row_number() over (
            partition by
                _airbyte_unique_key,
                _airbyte_start_at,
                _airbyte_emitted_at
            order by _airbyte_active_row desc, _airbyte_ab_id
        ) as _airbyte_row_num,
        to_hex(md5(cast(concat(coalesce(cast(_airbyte_unique_key as 
    string
), ''), '-', coalesce(cast(_airbyte_start_at as 
    string
), ''), '-', coalesce(cast(_airbyte_emitted_at as 
    string
), '')) as 
    string
))) as _airbyte_unique_key_scd,
        scd_data.*
    from scd_data
)
select
    _airbyte_unique_key,
    _airbyte_unique_key_scd,
    id,
    p_uid,
    editor,
    reason,
    status,
    premium,
    brand_id,
    currency,
    tax_code,
    bundle_id,
    item_name,
    lazada_id,
    variation,
    created_at,
    lazada_sku,
    order_flag,
    order_type,
    paid_price,
    seller_sku,
    tax_amount,
    unit_price,
    updated_at,
    billing_city,
    billing_name,
    crea_country,
    order_number,
    payment_time,
    shipping_fee,
    tracking_url,
    voucher_code,
    branch_number,
    crea_currency,
    customer_name,
    delivery_time,
    order_item_id,
    reason_detail,
    refund_amount,
    return_status,
    shipping_city,
    shipping_name,
    tracking_code,
    crea_status_id,
    customer_email,
    invoice_number,
    payment_method,
    sla_time_stamp,
    voucher_amount,
    voucher_seller,
    wallet_credits,
    billing_address,
    billing_country,
    bundle_discount,
    is_synced_to_bq,
    sent_to_cprv_at,
    shipping_amount,
    shipping_region,
    billing_address2,
    billing_address3,
    billing_address4,
    billing_address5,
    billing_postcode,
    cd_tracking_code,
    invoice_required,
    p_uid_updated_at,
    quantity_ordered,
    reversal__is_rtm,
    shipping_address,
    shipping_country,
    voucher_platform,
    lazada_created_at,
    lazada_updated_at,
    purchase_order_id,
    shipping_address2,
    shipping_address3,
    shipping_address4,
    shipping_address5,
    shipping_postcode,
    shipping_provider,
    shipment_type_name,
    billing_phone_number,
    cd_shipping_provider,
    reversal__ofc_status,
    billing_phone_number2,
    purchase_order_number,
    shipping_fee_original,
    shipping_phone_number,
    shipping_service_cost,
    tax_invoice_requested,
    promised_shipping_time,
    reversal__request_type,
    shipping_phone_number2,
    shipping_provider_type,
    cancel_return_initiator,
    is_tax_invoice_exported,
    reversal__shipping_type,
    tracking_url_first_mile,
    reversal__is_need_refund,
    reversal__reverse_status,
    tracking_code_first_mile,
    derived_order_time_failed,
    warehouse_packed_datetime,
    derived_order_time_shipped,
    reversal__reverse_order_id,
    warehouse_shipped_datetime,
    crea_send_shipped_to_client,
    derived_order_time_refunded,
    derived_order_time_returned,
    warehouse_received_datetime,
    crea_send_returned_to_client,
    derived_order_time_cancelled,
    derived_order_time_delivered,
    national_registration_number,
    shipping_fee_discount_seller,
    shipping_provider_first_mile,
    crea_warehouse_order_status_id,
    shipping_fee_discount_platform,
    reversal__reverse_order_line_id,
    derived_order_time_ready_to_ship,
    _airbyte_start_at,
    _airbyte_end_at,
    _airbyte_active_row,
    _airbyte_ab_id,
    _airbyte_emitted_at,
    CURRENT_TIMESTAMP() as _airbyte_normalized_at,
    _airbyte_lazada_raw_orders_hashid
from dedup_data where _airbyte_row_num = 1
  );

Note that this part where it selects max _airbyte_emitted_at is a known BQ issue that you need to assign it to a variable instead; otherwise the BQ SQL optimizer will still do full table scan.

where 1 = 1
    and coalesce(
    cast(_airbyte_emitted_at as
timestamp
            )>= (select max(cast(_airbyte_emitted_at as
timestamp
            )) from `marketplace-reporting`.airbyte.`lazada_raw_orders_scd`)

How do I edit this query? Do I export it from Docker image, edit the query, then put the file back where I export it from in the Docker image?

Airbyte version: 0.38.3-alpha

Thank you @naphat.t for reporting this.
These SQL query are generated by our normalization process. This process is using DBT to create destination specific SQL from a common DBT project.
You won’t be able to patch it easily unless you completely rebuild a custom version of Airbyte. The normalization is packaged into a docker image whose version is hardcoded in Airbyte.
I think you will find the logic generating the SQL you shared in airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py. and more globally in airbyte-integrations/bases/base-normalization.
Feel free to open an issue on our GitHub repo for this enhancement request. You are also welcome to suggest the change you’d make by opening a pull request.

I had the opportunity to chat with one of our main normalization contributor.
It seems that the optimization you suggest is already implemented for Snowflake here .
We were under the impression that this optimization was not required for BigQuery as the raw table are partionned by the emmited_at column which would allow BigQuery to make the max operation quite easily on incremental tables.

Note that this part where it selects max _airbyte_emitted_at is a known BQ issue that you need to assign it to a variable instead; otherwise the BQ SQL optimizer will still do full table scan.

Do you have some references to share that are explaining this limitation?

Hi @alafanechere,

Below links and quotes would be the references.

References:

However, the following query doesn’t prune partitions, because the filter, WHERE t1.ts = (SELECT timestamp from table where key = 2), is not a constant expression; it depends on the dynamic values of the timestamp and key fields:

SELECT
t1.name,
t2.category
FROM
table1 AS t1
INNER JOIN
table2 AS t2
ON
t1.id_field = t2.field2
WHERE
t1.ts = (SELECT timestamp from table3 where key = 2)

In general, partition pruning will reduce query cost when the filters can be evaluated at the outset of the query without requiring any subquery evaluations or data scans.

After you have read through the links, kindly

  1. Provide your suggestions/thoughts. If you agree that it is an issue that shall be fixed, kindly advise the earliest timeline that the issue will be prioritized and when the fix will be rolled out.
  2. Based on your response, if you agree that is an issue, I shall then create a Github issue at https://github.com/airbytehq/airbyte/issues.
  3. Kindly advise if there any hotfix that I can do in order to export the SQL, patch the query, then import the SQL back into Airbyte, so I can lower the cost on my end for the time being while waiting for the fix?

Thank you and best regards,
Naphat T.

Provide your suggestions/thoughts. If you agree that it is an issue that shall be fixed, kindly advise the earliest timeline that the issue will be prioritized and when the fix will be rolled out.

I opened the following issue I invite you to subscribe in order to receive updates. Updates will also be posted on this ticket. I can’t share a timeline, I triaged the issue to make this issue reach our database team backlog, but unfortunately this kind of optimization is not part of the immediate roadmap.

  1. Kindly advise if there any hotfix that I can do in order to export the SQL, patch the query, then import the SQL back into Airbyte, so I can lower the cost on my end for the time being while waiting for the fix?

You can import the existing generated DBT models for your connection and tweak this as you wish and then declare it as a custom transformation. We have a guide here on this topic.
It’d be very interesting if you could share your results and eventually suggests a change to our normalization DBT code by opening a PR on our repo.