Sync stopped after changing sync frequency to BigQuery Destination

  • Is this your first time deploying Airbyte?: No
  • Airbyte Version: 0.35.40-alpha
  • Source: Klaviyo (0.1.3)
  • Destination: BigQuery (0.6.7)
  • Step: At every sync job after increasing sync frequency
  • Description: Data ingestion stopped to the main table

From the logs, I can see that data was written to the staging table with no issues (I can query the temp table and data is there) but final data update to the main table stopped after increasing sync frequency from 24h to 6h

— edit 1 : After comparing past successful events vs after updating frequency, i noticed that normalization step never executed after updating since raw data is accessible.

Can you share one sync log to check the steps failed or didn’t work as expected?

This is where it didn’t work as expected

2022-05-11 01:30:48 e[44msourcee[0m > Finished syncing SourceKlaviyo
2022-05-11 01:30:48 e[32mINFOe[m i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):304 - Total records read: 92482
2022-05-11 01:30:48 e[32mINFOe[m i.a.w.DefaultReplicationWorker(run):162 - One of source or destination thread complete. Waiting on the other.
2022-05-11 01:30:48 e[43mdestinatione[0m > 2022-05-11 01:30:48 e[32mINFOe[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):63 - Airbyte message consumer: succeeded.
2022-05-11 01:30:48 e[43mdestinatione[0m > 2022-05-11 01:30:48 e[32mINFOe[m i.a.i.d.b.BigQueryRecordConsumer(close):55 - Started closing all connections
2022-05-11 01:30:48 e[43mdestinatione[0m > 2022-05-11 01:30:48 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(close):76 - Field fails during format : 
2022-05-11 01:30:48 e[43mdestinatione[0m > 2022-05-11 01:30:48 e[32mINFOe[m i.a.i.d.b.f.BigQueryRecordFormatter(printAndCleanFieldFails):70 - No field fails during record format.
2022-05-11 01:30:48 e[43mdestinatione[0m > 2022-05-11 01:30:48 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(close):79 - Closing connector:AbstractBigQueryUploader{table=_airbyte_raw_jacob_test_events, tmpTable=_airbyte_tmp_bey_jacob_test_events, syncMode=WRITE_APPEND, writer=class io.airbyte.integrations.destination.bigquery.writer.BigQueryTableWriter, recordFormatter=class io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryRecordFormatter}
2022-05-11 01:30:50 e[43mdestinatione[0m > 2022-05-11 01:30:50 e[32mINFOe[m i.a.i.d.b.BigQueryUtils(waitForJobFinish):304 - Waiting for job finish Job{job=JobId{project=undergroundcellar-319016, job=b136ff73-94fe-4936-9527-ce84bc9843c0, location=US}, status=JobStatus{state=RUNNING, error=null, executionErrors=null}, statistics=LoadStatistics{creationTime=1652232649638, endTime=null, startTime=1652232649886, numChildJobs=null, parentJobId=null, scriptStatistics=null, reservationUsage=null, inputBytes=null, inputFiles=null, outputBytes=null, outputRows=null, badRecords=null}, userEmail=airbyte@undergroundcellar-319016.iam.gserviceaccount.com, etag=iRVQK9sRapORVBqbJchDCw==, generatedId=undergroundcellar-319016:US.b136ff73-94fe-4936-9527-ce84bc9843c0, selfLink=https://www.googleapis.com/bigquery/v2/projects/undergroundcellar-319016/jobs/b136ff73-94fe-4936-9527-ce84bc9843c0?location=US, configuration=LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=klaviyo_ab, projectId=undergroundcellar-319016, tableId=_airbyte_tmp_bey_jacob_test_events}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createDisposition=CREATE_IF_NEEDED, writeDisposition=null, formatOptions=FormatOptions{format=NEWLINE_DELIMITED_JSON}, nullMarker=null, maxBadRecords=null, schema=Schema{fields=[Field{name=_airbyte_ab_id, type=STRING, mode=null, description=null, policyTags=null}, Field{name=_airbyte_emitted_at, type=TIMESTAMP, mode=null, description=null, policyTags=null}, Field{name=_airbyte_data, type=STRING, mode=null, description=null, policyTags=null}]}, ignoreUnknownValue=null, sourceUris=null, schemaUpdateOptions=null, autodetect=null, timePartitioning=null, clustering=null, useAvroLogicalTypes=null, labels=null, jobTimeoutMs=null, rangePartitioning=null, hivePartitioningOptions=null}}. Status: JobStatus{state=RUNNING, error=null, executionErrors=null}
2022-05-11 01:31:10 e[43mdestinatione[0m > 2022-05-11 01:31:10 e[32mINFOe[m i.a.i.d.b.BigQueryUtils(waitForJobFinish):306 - Job finish Job{job=JobId{project=undergroundcellar-319016, job=b136ff73-94fe-4936-9527-ce84bc9843c0, location=US}, status=JobStatus{state=RUNNING, error=null, executionErrors=null}, statistics=LoadStatistics{creationTime=1652232649638, endTime=null, startTime=1652232649886, numChildJobs=null, parentJobId=null, scriptStatistics=null, reservationUsage=null, inputBytes=null, inputFiles=null, outputBytes=null, outputRows=null, badRecords=null}, userEmail=airbyte@undergroundcellar-319016.iam.gserviceaccount.com, etag=iRVQK9sRapORVBqbJchDCw==, generatedId=undergroundcellar-319016:US.b136ff73-94fe-4936-9527-ce84bc9843c0, selfLink=https://www.googleapis.com/bigquery/v2/projects/undergroundcellar-319016/jobs/b136ff73-94fe-4936-9527-ce84bc9843c0?location=US, configuration=LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=klaviyo_ab, projectId=undergroundcellar-319016, tableId=_airbyte_tmp_bey_jacob_test_events}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createDisposition=CREATE_IF_NEEDED, writeDisposition=null, formatOptions=FormatOptions{format=NEWLINE_DELIMITED_JSON}, nullMarker=null, maxBadRecords=null, schema=Schema{fields=[Field{name=_airbyte_ab_id, type=STRING, mode=null, description=null, policyTags=null}, Field{name=_airbyte_emitted_at, type=TIMESTAMP, mode=null, description=null, policyTags=null}, Field{name=_airbyte_data, type=STRING, mode=null, description=null, policyTags=null}]}, ignoreUnknownValue=null, sourceUris=null, schemaUpdateOptions=null, autodetect=null, timePartitioning=null, clustering=null, useAvroLogicalTypes=null, labels=null, jobTimeoutMs=null, rangePartitioning=null, hivePartitioningOptions=null}} with status JobStatus{state=RUNNING, error=null, executionErrors=null}
2022-05-11 01:31:10 e[43mdestinatione[0m > 2022-05-11 01:31:10 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(uploadData):96 - Uploading data from the tmp table _airbyte_tmp_bey_jacob_test_events to the source table _airbyte_raw_jacob_test_events.
2022-05-11 01:31:10 e[43mdestinatione[0m > 2022-05-11 01:31:10 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(uploadDataToTableFromTmpTable):121 - Replication finished with no explicit errors. Copying data from tmp tables to permanent
2022-05-11 01:31:15 e[43mdestinatione[0m > 2022-05-11 01:31:15 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(copyTable):187 - successfully copied table: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=klaviyo_ab, tableId=_airbyte_tmp_bey_jacob_test_events}} to table: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=klaviyo_ab, tableId=_airbyte_raw_jacob_test_events}}
2022-05-11 01:31:15 e[43mdestinatione[0m > 2022-05-11 01:31:15 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(uploadData):98 - Data is successfully loaded to the source table _airbyte_raw_jacob_test_events!
2022-05-11 01:31:15 e[32mINFOe[m i.a.w.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$6):348 - State in DefaultReplicationWorker from destination: io.airbyte.protocol.models.AirbyteMessage@284064d9[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@26878fa1[data={"events":{"timestamp":1652232642}},additionalProperties={}],additionalProperties={}]
2022-05-11 01:31:15 e[43mdestinatione[0m > 2022-05-11 01:31:15 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(uploadData):100 - Final state message is accepted.
2022-05-11 01:31:15 e[43mdestinatione[0m > 2022-05-11 01:31:15 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(dropTmpTable):112 - Removing tmp tables...
2022-05-11 01:31:15 e[43mdestinatione[0m > 2022-05-11 01:31:15 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(dropTmpTable):114 - Finishing destination process...completed
2022-05-11 01:31:15 e[43mdestinatione[0m > 2022-05-11 01:31:15 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(close):86 - Closed connector:AbstractBigQueryUploader{table=_airbyte_raw_jacob_test_events, tmpTable=_airbyte_tmp_bey_jacob_test_events, syncMode=WRITE_APPEND, writer=class io.airbyte.integrations.destination.bigquery.writer.BigQueryTableWriter, recordFormatter=class io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryRecordFormatter}
2022-05-11 01:31:15 e[43mdestinatione[0m > 2022-05-11 01:31:15 e[32mINFOe[m i.a.i.b.IntegrationRunner(runInternal):153 - Completed integration: io.airbyte.integrations.destination.bigquery.BigQueryDestination
2022-05-11 01:31:16 e[32mINFOe[m i.a.w.DefaultReplicationWorker(run):164 - Source and destination threads complete.
2022-05-11 01:31:16 e[32mINFOe[m i.a.w.DefaultReplicationWorker(run):227 - sync summary: io.airbyte.config.ReplicationAttemptSummary@329af8e4[status=completed,recordsSynced=92481,bytesSynced=187008360,startTime=1652232312847,endTime=1652232676324,totalStats=io.airbyte.config.SyncStats@7b15fbb8[recordsEmitted=92481,bytesEmitted=187008360,stateMessagesEmitted=1,recordsCommitted=92481],streamStats=[io.airbyte.config.StreamSyncStats@53cda114[streamName=jacob_test_events,stats=io.airbyte.config.SyncStats@1a7a633d[recordsEmitted=92481,bytesEmitted=187008360,stateMessagesEmitted=<null>,recordsCommitted=92481]]]]
2022-05-11 01:31:16 e[32mINFOe[m i.a.w.DefaultReplicationWorker(run):247 - Source output at least one state message
2022-05-11 01:31:16 e[32mINFOe[m i.a.w.DefaultReplicationWorker(run):253 - State capture: Updated state to: Optional[io.airbyte.config.State@7fc2f54e[state={"events":{"timestamp":1652232642}}]]
2022-05-11 01:31:16 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(get):131 - Stopping cancellation check scheduling...
2022-05-11 01:31:16 e[32mINFOe[m i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$1):147 - sync summary: io.airbyte.config.StandardSyncOutput@24b45c16[standardSyncSummary=io.airbyte.config.StandardSyncSummary@fb31e36[status=completed,recordsSynced=92481,bytesSynced=187008360,startTime=1652232312847,endTime=1652232676324,totalStats=io.airbyte.config.SyncStats@7b15fbb8[recordsEmitted=92481,bytesEmitted=187008360,stateMessagesEmitted=1,recordsCommitted=92481],streamStats=[io.airbyte.config.StreamSyncStats@53cda114[streamName=jacob_test_events,stats=io.airbyte.config.SyncStats@1a7a633d[recordsEmitted=92481,bytesEmitted=187008360,stateMessagesEmitted=<null>,recordsCommitted=92481]]]],state=io.airbyte.config.State@7fc2f54e[state={"events":{"timestamp":1652232642}}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@5caa9c62[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@5e687fa8[stream=io.airbyte.protocol.models.AirbyteStream@58bacc9f[name=jacob_test_events,jsonSchema={"type":"object","required":["object","id","uuid","event_name","timestamp","datetime","statistic_id","event_properties","person"],"properties":{"id":{"type":"string"},"uuid":{"type":"string"},"object":{"type":"string"},"person":{"type":"object"},"datetime":{"type":"string"},"timestamp":{"type":"integer"},"event_name":{"type":"string"},"statistic_id":{"type":"string"},"event_properties":{"type":"object"}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=true,defaultCursorField=[timestamp],sourceDefinedPrimaryKey=[[id]],namespace=<null>,additionalProperties={}],syncMode=incremental,cursorField=[timestamp],destinationSyncMode=append_dedup,primaryKey=[[id]],additionalProperties={}]],additionalProperties={}],failures=[]]
2022-05-11 01:31:16 e[32mINFOe[m i.a.w.t.TemporalUtils(withBackgroundHeartbeat):235 - Stopping temporal heartbeating...

vs an old run that did the normalization

2022-04-13 08:50:26 e[44msourcee[0m > Finished syncing SourceKlaviyo
2022-04-13 08:50:26 e[32mINFOe[m i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):304 - Total records read: 218160
2022-04-13 08:50:26 e[32mINFOe[m i.a.w.DefaultReplicationWorker(run):162 - One of source or destination thread complete. Waiting on the other.
2022-04-13 08:50:26 e[43mdestinatione[0m > 2022-04-13 08:50:26 e[32mINFOe[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):63 - Airbyte message consumer: succeeded.
2022-04-13 08:50:26 e[43mdestinatione[0m > 2022-04-13 08:50:26 e[32mINFOe[m i.a.i.d.b.BigQueryRecordConsumer(close):55 - Started closing all connections
2022-04-13 08:50:26 e[43mdestinatione[0m > 2022-04-13 08:50:26 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(close):76 - Field fails during format : 
2022-04-13 08:50:26 e[43mdestinatione[0m > 2022-04-13 08:50:26 e[32mINFOe[m i.a.i.d.b.f.BigQueryRecordFormatter(printAndCleanFieldFails):70 - No field fails during record format.
2022-04-13 08:50:26 e[43mdestinatione[0m > 2022-04-13 08:50:26 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(close):79 - Closing connector:AbstractBigQueryUploader{table=_airbyte_raw_jacob_test_events, tmpTable=_airbyte_tmp_mwr_jacob_test_events, syncMode=WRITE_APPEND, writer=class io.airbyte.integrations.destination.bigquery.writer.BigQueryTableWriter, recordFormatter=class io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryRecordFormatter}
2022-04-13 08:50:28 e[43mdestinatione[0m > 2022-04-13 08:50:28 e[32mINFOe[m i.a.i.d.b.BigQueryUtils(waitForJobFinish):304 - Waiting for job finish Job{job=JobId{project=undergroundcellar-319016, job=e1a7e3a4-48bb-4ef3-ad59-10cd6ef801d2, location=US}, status=JobStatus{state=RUNNING, error=null, executionErrors=null}, statistics=LoadStatistics{creationTime=1649839827677, endTime=null, startTime=1649839827906, numChildJobs=null, parentJobId=null, scriptStatistics=null, reservationUsage=null, inputBytes=null, inputFiles=null, outputBytes=null, outputRows=null, badRecords=null}, userEmail=airbyte@undergroundcellar-319016.iam.gserviceaccount.com, etag=Xy2ax7SFu7PS75M4sP/2EQ==, generatedId=undergroundcellar-319016:US.e1a7e3a4-48bb-4ef3-ad59-10cd6ef801d2, selfLink=https://www.googleapis.com/bigquery/v2/projects/undergroundcellar-319016/jobs/e1a7e3a4-48bb-4ef3-ad59-10cd6ef801d2?location=US, configuration=LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=klaviyo_ab, projectId=undergroundcellar-319016, tableId=_airbyte_tmp_mwr_jacob_test_events}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createDisposition=CREATE_IF_NEEDED, writeDisposition=null, formatOptions=FormatOptions{format=NEWLINE_DELIMITED_JSON}, nullMarker=null, maxBadRecords=null, schema=Schema{fields=[Field{name=_airbyte_ab_id, type=STRING, mode=null, description=null, policyTags=null}, Field{name=_airbyte_emitted_at, type=TIMESTAMP, mode=null, description=null, policyTags=null}, Field{name=_airbyte_data, type=STRING, mode=null, description=null, policyTags=null}]}, ignoreUnknownValue=null, sourceUris=null, schemaUpdateOptions=null, autodetect=null, timePartitioning=null, clustering=null, useAvroLogicalTypes=null, labels=null, jobTimeoutMs=null, rangePartitioning=null, hivePartitioningOptions=null}}. Status: JobStatus{state=RUNNING, error=null, executionErrors=null}
2022-04-13 08:50:54 e[43mdestinatione[0m > 2022-04-13 08:50:54 e[32mINFOe[m i.a.i.d.b.BigQueryUtils(waitForJobFinish):306 - Job finish Job{job=JobId{project=undergroundcellar-319016, job=e1a7e3a4-48bb-4ef3-ad59-10cd6ef801d2, location=US}, status=JobStatus{state=RUNNING, error=null, executionErrors=null}, statistics=LoadStatistics{creationTime=1649839827677, endTime=null, startTime=1649839827906, numChildJobs=null, parentJobId=null, scriptStatistics=null, reservationUsage=null, inputBytes=null, inputFiles=null, outputBytes=null, outputRows=null, badRecords=null}, userEmail=airbyte@undergroundcellar-319016.iam.gserviceaccount.com, etag=Xy2ax7SFu7PS75M4sP/2EQ==, generatedId=undergroundcellar-319016:US.e1a7e3a4-48bb-4ef3-ad59-10cd6ef801d2, selfLink=https://www.googleapis.com/bigquery/v2/projects/undergroundcellar-319016/jobs/e1a7e3a4-48bb-4ef3-ad59-10cd6ef801d2?location=US, configuration=LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=klaviyo_ab, projectId=undergroundcellar-319016, tableId=_airbyte_tmp_mwr_jacob_test_events}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createDisposition=CREATE_IF_NEEDED, writeDisposition=null, formatOptions=FormatOptions{format=NEWLINE_DELIMITED_JSON}, nullMarker=null, maxBadRecords=null, schema=Schema{fields=[Field{name=_airbyte_ab_id, type=STRING, mode=null, description=null, policyTags=null}, Field{name=_airbyte_emitted_at, type=TIMESTAMP, mode=null, description=null, policyTags=null}, Field{name=_airbyte_data, type=STRING, mode=null, description=null, policyTags=null}]}, ignoreUnknownValue=null, sourceUris=null, schemaUpdateOptions=null, autodetect=null, timePartitioning=null, clustering=null, useAvroLogicalTypes=null, labels=null, jobTimeoutMs=null, rangePartitioning=null, hivePartitioningOptions=null}} with status JobStatus{state=RUNNING, error=null, executionErrors=null}
2022-04-13 08:50:54 e[43mdestinatione[0m > 2022-04-13 08:50:54 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(uploadData):96 - Uploading data from the tmp table _airbyte_tmp_mwr_jacob_test_events to the source table _airbyte_raw_jacob_test_events.
2022-04-13 08:50:54 e[43mdestinatione[0m > 2022-04-13 08:50:54 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(uploadDataToTableFromTmpTable):121 - Replication finished with no explicit errors. Copying data from tmp tables to permanent
2022-04-13 08:50:58 e[43mdestinatione[0m > 2022-04-13 08:50:58 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(copyTable):187 - successfully copied table: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=klaviyo_ab, tableId=_airbyte_tmp_mwr_jacob_test_events}} to table: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=klaviyo_ab, tableId=_airbyte_raw_jacob_test_events}}
2022-04-13 08:50:58 e[43mdestinatione[0m > 2022-04-13 08:50:58 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(uploadData):98 - Data is successfully loaded to the source table _airbyte_raw_jacob_test_events!
2022-04-13 08:50:58 e[32mINFOe[m i.a.w.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$6):348 - State in DefaultReplicationWorker from destination: io.airbyte.protocol.models.AirbyteMessage@66b19fff[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@68766d59[data={"events":{"timestamp":1649839818}},additionalProperties={}],additionalProperties={}]
2022-04-13 08:50:58 e[43mdestinatione[0m > 2022-04-13 08:50:58 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(uploadData):100 - Final state message is accepted.
2022-04-13 08:50:58 e[43mdestinatione[0m > 2022-04-13 08:50:58 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(dropTmpTable):112 - Removing tmp tables...
2022-04-13 08:50:58 e[43mdestinatione[0m > 2022-04-13 08:50:58 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(dropTmpTable):114 - Finishing destination process...completed
2022-04-13 08:50:58 e[43mdestinatione[0m > 2022-04-13 08:50:58 e[32mINFOe[m i.a.i.d.b.u.AbstractBigQueryUploader(close):86 - Closed connector:AbstractBigQueryUploader{table=_airbyte_raw_jacob_test_events, tmpTable=_airbyte_tmp_mwr_jacob_test_events, syncMode=WRITE_APPEND, writer=class io.airbyte.integrations.destination.bigquery.writer.BigQueryTableWriter, recordFormatter=class io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryRecordFormatter}
2022-04-13 08:50:58 e[43mdestinatione[0m > 2022-04-13 08:50:58 e[32mINFOe[m i.a.i.b.IntegrationRunner(runInternal):153 - Completed integration: io.airbyte.integrations.destination.bigquery.BigQueryDestination
2022-04-13 08:50:59 e[32mINFOe[m i.a.w.DefaultReplicationWorker(run):164 - Source and destination threads complete.
2022-04-13 08:50:59 e[32mINFOe[m i.a.w.DefaultReplicationWorker(run):227 - sync summary: io.airbyte.config.ReplicationAttemptSummary@2539d02b[status=completed,recordsSynced=218159,bytesSynced=425961720,startTime=1649838898702,endTime=1649839859059,totalStats=io.airbyte.config.SyncStats@6c3d99c2[recordsEmitted=218159,bytesEmitted=425961720,stateMessagesEmitted=1,recordsCommitted=218159],streamStats=[io.airbyte.config.StreamSyncStats@14ff63da[streamName=jacob_test_events,stats=io.airbyte.config.SyncStats@4dcb16fb[recordsEmitted=218159,bytesEmitted=425961720,stateMessagesEmitted=<null>,recordsCommitted=218159]]]]
2022-04-13 08:50:59 e[32mINFOe[m i.a.w.DefaultReplicationWorker(run):247 - Source output at least one state message
2022-04-13 08:50:59 e[32mINFOe[m i.a.w.DefaultReplicationWorker(run):253 - State capture: Updated state to: Optional[io.airbyte.config.State@26182fe1[state={"events":{"timestamp":1649839818}}]]
2022-04-13 08:50:59 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(get):131 - Stopping cancellation check scheduling...
2022-04-13 08:50:59 e[32mINFOe[m i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$1):147 - sync summary: io.airbyte.config.StandardSyncOutput@3473a66c[standardSyncSummary=io.airbyte.config.StandardSyncSummary@4866d14b[status=completed,recordsSynced=218159,bytesSynced=425961720,startTime=1649838898702,endTime=1649839859059,totalStats=io.airbyte.config.SyncStats@6c3d99c2[recordsEmitted=218159,bytesEmitted=425961720,stateMessagesEmitted=1,recordsCommitted=218159],streamStats=[io.airbyte.config.StreamSyncStats@14ff63da[streamName=jacob_test_events,stats=io.airbyte.config.SyncStats@4dcb16fb[recordsEmitted=218159,bytesEmitted=425961720,stateMessagesEmitted=<null>,recordsCommitted=218159]]]],state=io.airbyte.config.State@26182fe1[state={"events":{"timestamp":1649839818}}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@4c2aa390[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@19fa7555[stream=io.airbyte.protocol.models.AirbyteStream@3093f0c0[name=jacob_test_events,jsonSchema={"type":"object","required":["object","id","uuid","event_name","timestamp","datetime","statistic_id","event_properties","person"],"properties":{"id":{"type":"string"},"uuid":{"type":"string"},"object":{"type":"string"},"person":{"type":"object"},"datetime":{"type":"string"},"timestamp":{"type":"integer"},"event_name":{"type":"string"},"statistic_id":{"type":"string"},"event_properties":{"type":"object"}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=true,defaultCursorField=[timestamp],sourceDefinedPrimaryKey=[[id]],namespace=<null>,additionalProperties={}],syncMode=incremental,cursorField=[timestamp],destinationSyncMode=append_dedup,primaryKey=[[id]],additionalProperties={}]],additionalProperties={}],failures=[]]
2022-04-13 08:50:59 e[32mINFOe[m i.a.w.t.TemporalUtils(withBackgroundHeartbeat):235 - Stopping temporal heartbeating...
2022-04-13 08:50:59 e[32mINFOe[m i.a.c.p.ConfigRepository(updateConnectionState):559 - Updating connection 264890fa-96f2-4a33-be20-e056cacb9d58 state: io.airbyte.config.State@1db4080e[state={"events":{"timestamp":1649839818}}]
2022-04-13 08:50:59 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(get):105 - Docker volume job log path: /tmp/workspace/1504/0/logs.log
2022-04-13 08:50:59 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(get):110 - Executing worker wrapper. Airbyte version: 0.35.40-alpha
2022-04-13 08:50:59 e[32mINFOe[m i.a.w.DefaultNormalizationWorker(run):46 - Running normalization.
2022-04-13 08:50:59 e[32mINFOe[m i.a.w.n.DefaultNormalizationRunner(runProcess):122 - Running with normalization version: airbyte/normalization:0.1.68
2022-04-13 08:50:59 e[32mINFOe[m i.a.c.i.LineGobbler(voidCall):82 - Checking if airbyte/normalization:0.1.68 exists...
2022-04-13 08:50:59 e[32mINFOe[m i.a.w.p.DockerProcessFactory(create):104 - Creating docker job ID: 1504
2022-04-13 08:50:59 e[32mINFOe[m i.a.c.i.LineGobbler(voidCall):82 - airbyte/normalization:0.1.68 was found locally.
2022-04-13 08:50:59 e[32mINFOe[m i.a.w.p.DockerProcessFactory(create):158 - Preparing command: docker run --rm --init -i -w /data/1504/0/normalize --log-driver none --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local airbyte/normalization:0.1.68 run --integration-type bigquery --config destination_config.json --catalog destination_catalog.json
2022-04-13 08:50:59 e[42mnormalizatione[0m > Running: transform-config --config destination_config.json --integration-type bigquery --out /data/1504/0/normalize
2022-04-13 08:50:59 e[42mnormalizatione[0m > Namespace(config='destination_config.json', integration_type=<DestinationType.bigquery: 'bigquery'>, out='/data/1504/0/normalize')
2022-04-13 08:50:59 e[42mnormalizatione[0m > transform_bigquery
2022-04-13 08:50:59 e[42mnormalizatione[0m > Running: transform-catalog --integration-type bigquery --profile-config-dir /data/1504/0/normalize --catalog destination_catalog.json --out /data/1504/0/normalize/models/generated/ --json-column _airbyte_data
2022-04-13 08:51:00 e[42mnormalizatione[0m > Processing destination_catalog.json...
2022-04-13 08:51:00 e[42mnormalizatione[0m >   Generating airbyte_ctes/klaviyo_ab/jacob_test_events_ab1.sql from jacob_test_events
2022-04-13 08:51:00 e[42mnormalizatione[0m >   Generating airbyte_ctes/klaviyo_ab/jacob_test_events_ab2.sql from jacob_test_events
2022-04-13 08:51:00 e[42mnormalizatione[0m >   Generating airbyte_views/klaviyo_ab/jacob_test_events_stg.sql from jacob_test_events
2022-04-13 08:51:00 e[42mnormalizatione[0m >   Generating airbyte_incremental/scd/klaviyo_ab/jacob_test_events_scd.sql from jacob_test_events
2022-04-13 08:51:00 e[42mnormalizatione[0m >   Generating airbyte_incremental/klaviyo_ab/jacob_test_events.sql from jacob_test_events
2022-04-13 08:51:00 e[42mnormalizatione[0m > detected no config file for ssh, assuming ssh is off.
2022-04-13 08:51:02 e[42mnormalizatione[0m > Running with dbt=0.21.1
2022-04-13 08:51:02 e[42mnormalizatione[0m > Unable to do partial parsing because ../build/partial_parse.msgpack not found
2022-04-13 08:51:04 e[42mnormalizatione[0m > [e[33mWARNINGe[0m]: Configuration paths exist in your dbt_project.yml file which do not apply to any resources.
2022-04-13 08:51:04 e[42mnormalizatione[0m > There are 1 unused configuration paths:
2022-04-13 08:51:04 e[42mnormalizatione[0m > - models.airbyte_utils.generated.airbyte_tables
2022-04-13 08:51:04 e[42mnormalizatione[0m > 
2022-04-13 08:51:04 e[42mnormalizatione[0m > Found 5 models, 0 tests, 0 snapshots, 0 analyses, 516 macros, 0 operations, 0 seed files, 1 source, 0 exposures
2022-04-13 08:51:04 e[42mnormalizatione[0m > 
2022-04-13 08:51:06 e[42mnormalizatione[0m > 08:51:06 | Concurrency: 8 threads (target='prod')
2022-04-13 08:51:06 e[42mnormalizatione[0m > 08:51:06 | 
2022-04-13 08:51:06 e[42mnormalizatione[0m > 08:51:06 | 1 of 3 START view model _airbyte_klaviyo_ab.jacob_test_events_stg............................................ [RUN]
2022-04-13 08:51:06 e[42mnormalizatione[0m > 08:51:06 | 1 of 3 OK created view model _airbyte_klaviyo_ab.jacob_test_events_stg....................................... [e[32mOKe[0m in 0.73s]
2022-04-13 08:51:06 e[42mnormalizatione[0m > 08:51:06 | 2 of 3 START incremental model klaviyo_ab.jacob_test_events_scd.............................................. [RUN]
2022-04-13 08:53:00 e[42mnormalizatione[0m > 08:53:00 | 2 of 3 OK created incremental model klaviyo_ab.jacob_test_events_scd......................................... [e[32mMERGE (218.2k rows, 2.0 GB processed)e[0m in 113.36s]
2022-04-13 08:53:00 e[42mnormalizatione[0m > 08:53:00 | 3 of 3 START incremental model klaviyo_ab.jacob_test_events.................................................. [RUN]
2022-04-13 08:53:37 e[42mnormalizatione[0m > 08:53:37 | 3 of 3 OK created incremental model klaviyo_ab.jacob_test_events............................................. [e[32mMERGE (218.2k rows, 1.9 GB processed)e[0m in 36.86s]
2022-04-13 08:53:37 e[42mnormalizatione[0m > 08:53:37 | 
2022-04-13 08:53:37 e[42mnormalizatione[0m > 08:53:37 | Finished running 1 view model, 2 incremental models in 152.35s.
2022-04-13 08:53:37 e[42mnormalizatione[0m > 
2022-04-13 08:53:37 e[42mnormalizatione[0m > e[32mCompleted successfullye[0m
2022-04-13 08:53:37 e[42mnormalizatione[0m > 
2022-04-13 08:53:37 e[42mnormalizatione[0m > Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3
2022-04-13 08:53:37 e[32mINFOe[m i.a.w.DefaultNormalizationWorker(run):69 - Normalization executed in 2 minutes 38 seconds.
2022-04-13 08:53:37 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(get):131 - Stopping cancellation check scheduling...
2022-04-13 08:53:37 e[32mINFOe[m i.a.w.t.TemporalUtils(withBackgroundHeartbeat):235 - Stopping temporal heartbeating...

I found a solution to this. It was as easy as switching the toggole from raw json to normalized :grimacing:

Awesome to hear that Bedi! Let me know if you need further assistance.

1 Like