Sync appears to be getting cancelled on it's own when handling large amounts of data

  • Is this your first time deploying Airbyte?: No
  • OS Version / Instance: Amazon Linux 2
  • Memory / Disk: 1Gi memory request allocated
  • Deployment: Kubernetes via plural.sh
  • Airbyte Version: 0.39.14
  • Source name/version: custom connector
  • Destination name/version: redshift with s3
  • Step: During sync
  • Description: Sync appears to be failing when crossing 900gb data synced. The logs show cancellation related messaged but the job isn’t cancelled and further sync attempts are triggered by airbyte.
    With configs that produce less data (i.e. 30gb) this sync goes through fine.
    The duration for this data volume exceeds 3+ days in sync time.
2022-10-17 10:20:38 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):344 - Records read: 47749000 (908 GB)
2022-10-17 10:20:42 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):344 - Records read: 47750000 (908 GB)
2022-10-17 10:20:45 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):344 - Records read: 47751000 (908 GB)
2022-10-17 10:20:54 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):344 - Records read: 47752000 (908 GB)
2022-10-17 10:20:58 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):344 - Records read: 47753000 (908 GB)
2022-10-17 10:21:02 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):344 - Records read: 47754000 (908 GB)
2022-10-17 10:21:07 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):344 - Records read: 47755000 (908 GB)
2022-10-17 10:21:12 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):344 - Records read: 47756000 (908 GB)
2022-10-17 10:21:16 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(lambda$getCancellationChecker$3):194 - Running sync worker cancellation...
2022-10-17 10:21:16 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(cancel):433 - Cancelling replication worker...
2022-10-17 10:21:19 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):344 - Records read: 47757000 (908 GB)
2022-10-17 10:21:24 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):344 - Records read: 47758000 (908 GB)
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(cancel):441 - Cancelling destination...
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.i.DefaultAirbyteDestination(cancel):125 - Attempting to cancel destination process...
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.i.DefaultAirbyteDestination(cancel):130 - Destination process exists, cancelling...
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.p.KubePodProcess(destroy):659 - (pod: airbyte / destination-redshift-write-14-0-xqppe) - Destroying Kube process.
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):355 - Total records read: 47758577 (908 GB)
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):174 - One of source or destination thread complete. Waiting on the other.
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.p.KubePodProcess(close):714 - (pod: airbyte / destination-redshift-write-14-0-xqppe) - Closed all resources for pod
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.p.KubePodProcess(destroy):665 - (pod: airbyte / destination-redshift-write-14-0-xqppe) - Destroyed Kube process.
2022-10-17 10:21:26 e[33mWARNe[m i.a.c.i.LineGobbler(voidCall):86 - airbyte-destination gobbler IOException: Socket closed. Typically happens when cancelling a job.
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.i.DefaultAirbyteDestination(cancel):132 - Cancelled destination process!
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(cancel):448 - Cancelling source...
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.i.DefaultAirbyteSource(cancel):142 - Attempting to cancel source process...
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.i.DefaultAirbyteSource(cancel):147 - Source process exists, cancelling...
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.p.KubePodProcess(destroy):659 - (pod: airbyte / source-bayes-historical-read-14-0-pidmb) - Destroying Kube process.
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):176 - Source and destination threads complete.
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.p.KubePodProcess(close):714 - (pod: airbyte / source-bayes-historical-read-14-0-pidmb) - Closed all resources for pod
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.p.KubePodProcess(destroy):665 - (pod: airbyte / source-bayes-historical-read-14-0-pidmb) - Destroyed Kube process.
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.i.DefaultAirbyteSource(cancel):149 - Cancelled source process!
2022-10-17 10:21:26 e[33mWARNe[m i.a.c.i.LineGobbler(voidCall):86 - airbyte-source gobbler IOException: Socket closed. Typically happens when cancelling a job.
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(lambda$getCancellationChecker$3):198 - Interrupting worker thread...
2022-10-17 10:21:26 e[1;31mERRORe[m i.a.w.WorkerUtils(gentleClose):45 - Exception while while waiting for process to finish
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method) ~[?:?]
	at java.lang.Process.waitFor(Process.java:468) ~[?:?]
	at io.airbyte.workers.process.KubePodProcess.waitFor(KubePodProcess.java:648) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
	at io.airbyte.workers.WorkerUtils.gentleClose(WorkerUtils.java:42) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
	at io.airbyte.workers.internal.DefaultAirbyteSource.close(DefaultAirbyteSource.java:128) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
	at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:178) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
	at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
	at java.lang.Thread.run(Thread.java:833) [?:?]
2022-10-17 10:21:26 e[1;31mERRORe[m i.a.w.g.DefaultReplicationWorker(run):180 - Sync worker failed.
io.airbyte.workers.exception.WorkerException: Source process exit with code 143. This warning is normal if the job was cancelled.
	at io.airbyte.workers.internal.DefaultAirbyteSource.close(DefaultAirbyteSource.java:136) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
	at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:178) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
	at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
	at java.lang.Thread.run(Thread.java:833) [?:?]
	Suppressed: io.airbyte.workers.exception.WorkerException: Destination process exit with code 143. This warning is normal if the job was cancelled.
		at io.airbyte.workers.internal.DefaultAirbyteDestination.close(DefaultAirbyteDestination.java:119) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
		at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:137) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
		at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
		at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
		at java.lang.Thread.run(Thread.java:833) [?:?]
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):239 - sync summary: io.airbyte.config.ReplicationAttemptSummary@1ed9a40c[status=cancelled,recordsSynced=47758576,bytesSynced=975780792449,startTime=1665742856734,endTime=1666002086895,totalStats=io.airbyte.config.SyncStats@412cf820[recordsEmitted=47758576,bytesEmitted=975780792449,stateMessagesEmitted=1,recordsCommitted=0],streamStats=[io.airbyte.config.StreamSyncStats@500907f1[streamName=bayes_hist_tournaments,stats=io.airbyte.config.SyncStats@4638d984[recordsEmitted=8,bytesEmitted=1079,stateMessagesEmitted=<null>,recordsCommitted=<null>]], io.airbyte.config.StreamSyncStats@1592ec8b[streamName=bayes_hist_teams,stats=io.airbyte.config.SyncStats@3359fbe1[recordsEmitted=584,bytesEmitted=29152,stateMessagesEmitted=<null>,recordsCommitted=<null>]], io.airbyte.config.StreamSyncStats@2b7744f4[streamName=bayes_hist_tournaments_matches,stats=io.airbyte.config.SyncStats@2b034ce7[recordsEmitted=8628,bytesEmitted=7394606,stateMessagesEmitted=<null>,recordsCommitted=<null>]], io.airbyte.config.StreamSyncStats@29771bb8[streamName=bayes_hist_leagues,stats=io.airbyte.config.SyncStats@550d6b84[recordsEmitted=52,bytesEmitted=8207,stateMessagesEmitted=<null>,recordsCommitted=<null>]], io.airbyte.config.StreamSyncStats@44ccc535[streamName=bayes_hist_unique_game_events,stats=io.airbyte.config.SyncStats@7519dd3f[recordsEmitted=47749304,bytesEmitted=975773359405,stateMessagesEmitted=<null>,recordsCommitted=<null>]]]]
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):266 - Source output at least one state message
2022-10-17 10:21:26 e[33mWARNe[m i.a.w.g.DefaultReplicationWorker(run):276 - State capture: No new state, falling back on input state: io.airbyte.config.State@32051a63[state={}]
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(get):134 - Stopping cancellation check scheduling...
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$3):157 - sync summary: io.airbyte.config.StandardSyncOutput@7e1b617b[standardSyncSummary=io.airbyte.config.StandardSyncSummary@70e7cff0[status=cancelled,recordsSynced=47758576,bytesSynced=975780792449,startTime=1665742856734,endTime=1666002086895,totalStats=io.airbyte.config.SyncStats@412cf820[recordsEmitted=47758576,bytesEmitted=975780792449,stateMessagesEmitted=1,recordsCommitted=0],streamStats=[io.airbyte.config.StreamSyncStats@500907f1[streamName=bayes_hist_tournaments,stats=io.airbyte.config.SyncStats@4638d984[recordsEmitted=8,bytesEmitted=1079,stateMessagesEmitted=<null>,recordsCommitted=<null>]], io.airbyte.config.StreamSyncStats@1592ec8b[streamName=bayes_hist_teams,stats=io.airbyte.config.SyncStats@3359fbe1[recordsEmitted=584,bytesEmitted=29152,stateMessagesEmitted=<null>,recordsCommitted=<null>]], io.airbyte.config.StreamSyncStats@2b7744f4[streamName=bayes_hist_tournaments_matches,stats=io.airbyte.config.SyncStats@2b034ce7[recordsEmitted=8628,bytesEmitted=7394606,stateMessagesEmitted=<null>,recordsCommitted=<null>]], io.airbyte.config.StreamSyncStats@29771bb8[streamName=bayes_hist_leagues,stats=io.airbyte.config.SyncStats@550d6b84[recordsEmitted=52,bytesEmitted=8207,stateMessagesEmitted=<null>,recordsCommitted=<null>]], io.airbyte.config.StreamSyncStats@44ccc535[streamName=bayes_hist_unique_game_events,stats=io.airbyte.config.SyncStats@7519dd3f[recordsEmitted=47749304,bytesEmitted=975773359405,stateMessagesEmitted=<null>,recordsCommitted=<null>]]]],normalizationSummary=<null>,state=io.airbyte.config.State@32051a63[state={}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@6111e291[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@72b779c6[stream=io.airbyte.protocol.models.AirbyteStream@5702ce52[name=bayes_hist_leagues,jsonSchema={"type":"object","$schema":"http://json-schema.org/draft-04/schema#","properties":{"id":{"type":["string","null"]},"name":{"type":["string","null"]},"slug":{"type":["string","null"]},"logoUrl":{"type":["string","null"]}}},supportedSyncModes=[full_refresh],sourceDefinedCursor=<null>,defaultCursorField=[],sourceDefinedPrimaryKey=[[id]],namespace=<null>,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=append,primaryKey=[[id]],additionalProperties={}], io.airbyte.protocol.models.ConfiguredAirbyteStream@1954130d[stream=io.airbyte.protocol.models.AirbyteStream@773f6fdc[name=bayes_hist_teams,jsonSchema={"type":"object","$schema":"http://json-schema.org/draft-04/schema#","properties":{"id":{"type":["string","null"]},"name":{"type":["string","null"]}}},supportedSyncModes=[full_refresh],sourceDefinedCursor=<null>,defaultCursorField=[],sourceDefinedPrimaryKey=[[id]],namespace=<null>,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=append,primaryKey=[[id]],additionalProperties={}], io.airbyte.protocol.models.ConfiguredAirbyteStream@3e0f41b4[stream=io.airbyte.protocol.models.AirbyteStream@7f3e7879[name=bayes_hist_tournaments,jsonSchema={"type":"object","$schema":"http://json-schema.org/draft-04/schema#","properties":{"id":{"type":["string","null"]},"name":{"type":["string","null"]},"slug":{"type":["string","null"]},"endDate":{"type":["string","null"]},"startDate":{"type":["string","null"]}}},supportedSyncModes=[full_refresh],sourceDefinedCursor=<null>,defaultCursorField=[],sourceDefinedPrimaryKey=[[id]],namespace=<null>,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=append,primaryKey=[[id]],additionalProperties={}], io.airbyte.protocol.models.ConfiguredAirbyteStream@46cbd0e8[stream=io.airbyte.protocol.models.AirbyteStream@4a860662[name=bayes_hist_tournaments_matches,jsonSchema={"type":"object","$schema":"http://json-schema.org/draft-04/schema#","properties":{"match":{"type":"object","properties":{"id":{"type":["string","null"]},"games":{"type":["array","null"],"items":[{"type":["object","null"],"properties":{"id":{"type":["string","null"]},"endTime":{"type":["string","null"]},"parsedAt":{"type":["string","null"]},"startTime":{"type":["string","null"]},"gameNumber":{"type":["integer","null"]},"winnerTeamId":{"type":["string","null"]},"numberOfMessages":{"type":["integer","null"]},"downloadAvailable":{"type":["boolean","null"]}}},{"type":["object","null"],"properties":{"id":{"type":["string","null"]},"endTime":{"type":["string","null"]},"parsedAt":{"type":["string","null"]},"startTime":{"type":["string","null"]},"gameNumber":{"type":["integer","null"]},"winnerTeamId":{"type":["string","null"]},"numberOfMessages":{"type":["integer","null"]},"downloadAvailable":{"type":["boolean","null"]}}}]},"teams":{"type":["array","null"],"items":[{"type":"object","properties":{"id":{"type":["string","null"]},"name":{"type":["string","null"]}}}]},"bestOf":{"type":["integer","null"]},"startTime":{"type":["string","null"]}}},"league":{"type":"object","properties":{"id":{"type":["string","null"]},"name":{"type":["string","null"]},"slug":{"type":["string","null"]},"logoUrl":{"type":["string","null"]}}},"tournament":{"type":["object","null"],"properties":{"id":{"type":["string","null"]},"name":{"type":["string","null"]},"slug":{"type":["string","null"]},"endDate":{"type":["string","null"]},"startDate":{"type":["string","null"]}}}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=true,defaultCursorField=[startTime],sourceDefinedPrimaryKey=[[id]],namespace=<null>,additionalProperties={}],syncMode=incremental,cursorField=[startTime],destinationSyncMode=append_dedup,primaryKey=[[id]],additionalProperties={}], io.airbyte.protocol.models.ConfiguredAirbyteStream@e40fc67[stream=io.airbyte.protocol.models.AirbyteStream@2b1b066[name=bayes_hist_unique_game_events,jsonSchema={"type":"object","$schema":"http://json-schema.org/draft-04/schema#","properties":{"path":{"type":["string","null"]},"seqIdx":{"type":["integer","null"]},"payload":{"type":["object","null"]},"version":{"type":["string","null"]},"timeSent":{"type":["string","null"]},"references":{"type":["string","null"]},"providerSlug":{"type":["string","null"]}}},supportedSyncModes=[full_refresh],sourceDefinedCursor=<null>,defaultCursorField=[],sourceDefinedPrimaryKey=[],namespace=<null>,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=append,primaryKey=[],additionalProperties={}]],additionalProperties={}],failures=[]]
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.t.TemporalUtils(withBackgroundHeartbeat):236 - Stopping temporal heartbeating...
2022-10-17 10:21:26 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(lambda$getCancellationChecker$3):201 - Cancelling completable future...
2022-10-17 10:21:26 e[33mWARNe[m i.a.w.t.CancellationHandler$TemporalCancellationHandler(checkAndHandleCancellation):53 - Job either timed out or was cancelled.

Hello there! You are receiving this message because none of your fellow community members has stepped in to respond to your topic post. (If you are a community member and you are reading this response, feel free to jump in if you have the answer!) As a result, the Community Assistance Team has been made aware of this topic and will be investigating and responding as quickly as possible.
Some important considerations that will help your to get your issue solved faster:

  • It is best to use our topic creation template; if you haven’t yet, we recommend posting a followup with the requested information. With that information the team will be able to more quickly search for similar issues with connectors and the platform and troubleshoot more quickly your specific question or problem.
  • Make sure to upload the complete log file; a common investigation roadblock is that sometimes the error for the issue happens well before the problem is surfaced to the user, and so having the tail of the log is less useful than having the whole log to scan through.
  • Be as descriptive and specific as possible; when investigating it is extremely valuable to know what steps were taken to encounter the issue, what version of connector / platform / Java / Python / docker / k8s was used, etc. The more context supplied, the quicker the investigation can start on your topic and the faster we can drive towards an answer.
  • We in the Community Assistance Team are glad you’ve made yourself part of our community, and we’ll do our best to answer your questions and resolve the problems as quickly as possible. Expect to hear from a specific team member as soon as possible.

Thank you for your time and attention.
Best,
The Community Assistance Team

Hello, thanks for reporting this. Could you please try a newer version of airbyte as there have been many platform improvements, we are currently at v0.40.15.

Also, could you please attach the complete sync log? It looks like the destination job was cancelled but I can’t see why because we are missing part of the information.