Running sync worker cancellation

  • Is this your first time deploying Airbyte?: Yes
  • OS Version / Instance: Ubuntu 20.04
  • Memory / Disk: you can use something like 8Gb / 300 Gb
  • Deployment: Docker
  • Airbyte Version: 0.40.2
  • Source name/version: airbyte/source-oracle:0.3.21
  • Destination name/version: destination-postgres:0.3.26
  • Step: The issue is happening during sync

The sync process just stops randomly. I retried it several times. Sometimes it takes 5 hours, sometimes only 20 minutes until the same error occurs, see the attach logs.

2022-11-11 13:39:40 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):339 - Records read: 1671000 (1 GB)
2022-11-11 13:40:43 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(lambda$getCancellationChecker$3):191 - Running sync worker cancellation...
2022-11-11 13:40:43 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(cancel):459 - Cancelling replication worker...
2022-11-11 13:40:53 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(cancel):467 - Cancelling destination...
2022-11-11 13:40:53 e[32mINFOe[m i.a.w.i.DefaultAirbyteDestination(cancel):125 - Attempting to cancel destination process...
2022-11-11 13:40:53 e[32mINFOe[m i.a.w.i.DefaultAirbyteDestination(cancel):130 - Destination process exists, cancelling...
2022-11-11 13:40:53 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):178 - One of source or destination thread complete. Waiting on the other.
2022-11-11 13:40:53 e[32mINFOe[m i.a.w.i.DefaultAirbyteDestination(cancel):132 - Cancelled destination process!
2022-11-11 13:40:53 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(cancel):474 - Cancelling source...
2022-11-11 13:40:53 e[32mINFOe[m i.a.w.i.DefaultAirbyteSource(cancel):142 - Attempting to cancel source process...
2022-11-11 13:40:53 e[32mINFOe[m i.a.w.i.DefaultAirbyteSource(cancel):147 - Source process exists, cancelling...
2022-11-11 13:40:53 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):180 - Source and destination threads complete.
2022-11-11 13:40:53 e[32mINFOe[m i.a.w.i.DefaultAirbyteSource(cancel):149 - Cancelled source process!
2022-11-11 13:40:53 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(lambda$getCancellationChecker$3):195 - Interrupting worker thread...
2022-11-11 13:40:53 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(lambda$getCancellationChecker$3):198 - Cancelling completable future...
2022-11-11 13:40:53 e[33mWARNe[m i.a.w.t.CancellationHandler$TemporalCancellationHandler(checkAndHandleCancellation):53 - Job either timed out or was cancelled.
2022-11-11 13:40:53 e[33mWARNe[m i.a.w.t.CancellationHandler$TemporalCancellationHandler(checkAndHandleCancellation):53 - Job either timed out or was cancelled.
2022-11-11 13:40:53 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(get):131 - Stopping cancellation check scheduling...
2022-11-11 13:40:53 e[1;31mERRORe[m i.a.w.g.DefaultReplicationWorker(run):184 - Sync worker failed.
io.airbyte.workers.exception.WorkerException: Source process exit with code 143. This warning is normal if the job was cancelled.
	at io.airbyte.workers.internal.DefaultAirbyteSource.close(DefaultAirbyteSource.java:136) ~[io.airbyte-airbyte-workers-0.40.2.jar:?]
	at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:182) ~[io.airbyte-airbyte-workers-0.40.2.jar:?]
	at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.40.2.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:155) ~[io.airbyte-airbyte-workers-0.40.2.jar:?]
	at java.lang.Thread.run(Thread.java:1589) [?:?]
	Suppressed: java.io.IOException: Stream closed
		at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:445) ~[?:?]
		at java.io.OutputStream.write(OutputStream.java:164) ~[?:?]
		at java.io.BufferedOutputStream.implWrite(BufferedOutputStream.java:216) ~[?:?]
		at java.io.BufferedOutputStream.write(BufferedOutputStream.java:205) ~[?:?]
		at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:313) ~[?:?]
		at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:385) ~[?:?]
		at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:361) ~[?:?]
		at sun.nio.cs.StreamEncoder.lockedWrite(StreamEncoder.java:162) ~[?:?]
		at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:143) ~[?:?]
		at java.io.OutputStreamWriter.write(OutputStreamWriter.java:220) ~[?:?]
		at java.io.BufferedWriter.implFlushBuffer(BufferedWriter.java:178) ~[?:?]
		at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:163) ~[?:?]
		at java.io.BufferedWriter.implFlush(BufferedWriter.java:371) ~[?:?]
		at java.io.BufferedWriter.flush(BufferedWriter.java:359) ~[?:?]
		at io.airbyte.workers.internal.DefaultAirbyteDestination.notifyEndOfInput(DefaultAirbyteDestination.java:98) ~[io.airbyte-airbyte-workers-0.40.2.jar:?]
		at io.airbyte.workers.internal.DefaultAirbyteDestination.close(DefaultAirbyteDestination.java:111) ~[io.airbyte-airbyte-workers-0.40.2.jar:?]
		at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:141) ~[io.airbyte-airbyte-workers-0.40.2.jar:?]
		at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.40.2.jar:?]
		at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:155) ~[io.airbyte-airbyte-workers-0.40.2.jar:?]
		at java.lang.Thread.run(Thread.java:1589) [?:?]
2022-11-11 13:40:53 e[32mINFOe[m i.a.w.t.TemporalUtils(withBackgroundHeartbeat):312 - Stopping temporal heartbeating...
2022-11-11 13:40:53 e[33mWARNe[m i.t.i.a.POJOActivityTaskHandler(activityFailureToResult):307 - Activity failure. ActivityId=e86ed087-f909-354d-a96a-ec93f7ad8af3, activityType=Replicate, attempt=1
java.lang.RuntimeException: java.util.concurrent.CancellationException
	at io.airbyte.workers.temporal.TemporalUtils.withBackgroundHeartbeat(TemporalUtils.java:310) ~[io.airbyte-airbyte-workers-0.40.2.jar:?]
	at io.airbyte.workers.temporal.sync.ReplicationActivityImpl.replicate(ReplicationActivityImpl.java:119) ~[io.airbyte-airbyte-workers-0.40.2.jar:?]
	at jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:578) ~[?:?]
	at io.temporal.internal.activity.POJOActivityTaskHandler$POJOActivityInboundCallsInterceptor.execute(POJOActivityTaskHandler.java:214) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.activity.POJOActivityTaskHandler$POJOActivityImplementation.execute(POJOActivityTaskHandler.java:180) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.activity.POJOActivityTaskHandler.handle(POJOActivityTaskHandler.java:120) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:204) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:164) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93) ~[temporal-sdk-1.8.1.jar:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) [?:?]
	at java.lang.Thread.run(Thread.java:1589) [?:?]
Caused by: java.util.concurrent.CancellationException
	at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2510) ~[?:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$3(TemporalAttemptExecution.java:201) ~[io.airbyte-airbyte-workers-0.40.2.jar:?]
	at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:52) ~[io.airbyte-airbyte-workers-0.40.2.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$4(TemporalAttemptExecution.java:204) ~[io.airbyte-airbyte-workers-0.40.2.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577) ~[?:?]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
	... 3 more
2022-11-11 13:40:53 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):249 - sync summary: io.airbyte.config.ReplicationAttemptSummary@5f330500[status=cancelled,recordsSynced=1671117,bytesSynced=1992649494,startTime=1668172713248,endTime=1668174053716,totalStats=io.airbyte.config.SyncStats@755a4818[recordsEmitted=1671117,bytesEmitted=1992649494,sourceStateMessagesEmitted=0,destinationStateMessagesEmitted=0,recordsCommitted=0,meanSecondsBeforeSourceStateMessageEmitted=0,maxSecondsBeforeSourceStateMessageEmitted=0,maxSecondsBetweenStateMessageEmittedandCommitted=0,meanSecondsBetweenStateMessageEmittedandCommitted=0,additionalProperties={}],streamStats=[io.airbyte.config.StreamSyncStats@27308360[streamName=A_ABRFALL_LEISTUNG,stats=io.airbyte.config.SyncStats@6e3798cf[recordsEmitted=1671117,bytesEmitted=1992649494,sourceStateMessagesEmitted=<null>,destinationStateMessagesEmitted=<null>,recordsCommitted=<null>,meanSecondsBeforeSourceStateMessageEmitted=<null>,maxSecondsBeforeSourceStateMessageEmitted=<null>,maxSecondsBetweenStateMessageEmittedandCommitted=<null>,meanSecondsBetweenStateMessageEmittedandCommitted=<null>,additionalProperties={}]]]]
2022-11-11 13:40:53 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):278 - Source did not output any state messages
2022-11-11 13:40:53 e[33mWARNe[m i.a.w.g.DefaultReplicationWorker(run):289 - State capture: No state retained.

Hello there! You are receiving this message because none of your fellow community members has stepped in to respond to your topic post. (If you are a community member and you are reading this response, feel free to jump in if you have the answer!) As a result, the Community Assistance Team has been made aware of this topic and will be investigating and responding as quickly as possible.
Some important considerations that will help your to get your issue solved faster:

  • It is best to use our topic creation template; if you haven’t yet, we recommend posting a followup with the requested information. With that information the team will be able to more quickly search for similar issues with connectors and the platform and troubleshoot more quickly your specific question or problem.
  • Make sure to upload the complete log file; a common investigation roadblock is that sometimes the error for the issue happens well before the problem is surfaced to the user, and so having the tail of the log is less useful than having the whole log to scan through.
  • Be as descriptive and specific as possible; when investigating it is extremely valuable to know what steps were taken to encounter the issue, what version of connector / platform / Java / Python / docker / k8s was used, etc. The more context supplied, the quicker the investigation can start on your topic and the faster we can drive towards an answer.
  • We in the Community Assistance Team are glad you’ve made yourself part of our community, and we’ll do our best to answer your questions and resolve the problems as quickly as possible. Expect to hear from a specific team member as soon as possible.

Thank you for your time and attention.
Best,
The Community Assistance Team

Hi @mdebic, thanks for your patience on this. I see that the sync job might be timing out in a weird way. I’d say the first thing to try is to tweak some timeout variables, look at these first:
https://airbytehq.github.io/operator-guides/configuring-airbyte#jobs

If that doesn’t help we’ll have to dig deeper. These aren’t the full logs, right? Would you be able to attach the full logs?