Mailchimp connector fails to complete initial sync (shows failure/cancelation during email_activity stream)

  • Is this your first time deploying Airbyte?: No
  • OS Version / Instance: Ubuntu 20.04 (LTS) x64
  • Memory / Disk: 8GB / 160GB
  • Deployment: Docker
  • Airbyte Version: 0.40.0-alpha
  • Source name/version: source-mailchimp 0.2.14
  • Destination name/version: destination-bigquery 1.1.14
  • Step: During initial sync
  • Description: Initial syncs seem to fail reliably for MailChimp connectors, at least those with any significant amount of history in the email_events stream. The logs show a cancelation during that stream, but it happens at varying times in the sync. (Another very small MC account, < 11K records, completed successfully)

Full logs attached, but relevant errors here:

2022-08-18 22:35:07 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):329 - Records read: 49000 (56 MB)
2022-08-18 22:39:48 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):329 - Records read: 50000 (57 MB)
2022-08-18 22:46:41 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(lambda$getCancellationChecker$3):191 - Running sync worker cancellation...
2022-08-18 22:46:41 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(cancel):449 - Cancelling replication worker...
2022-08-18 22:46:51 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(cancel):457 - Cancelling destination...
2022-08-18 22:46:51 e[32mINFOe[m i.a.w.i.DefaultAirbyteDestination(cancel):125 - Attempting to cancel destination process...
2022-08-18 22:46:51 e[32mINFOe[m i.a.w.i.DefaultAirbyteDestination(cancel):130 - Destination process exists, cancelling...
2022-08-18 22:46:51 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):178 - One of source or destination thread complete. Waiting on the other.
2022-08-18 22:46:51 e[33mWARNe[m i.a.c.i.LineGobbler(voidCall):88 - airbyte-destination gobbler IOException: Stream closed. Typically happens when cancelling a job.
2022-08-18 22:46:51 e[32mINFOe[m i.a.w.i.DefaultAirbyteDestination(cancel):132 - Cancelled destination process!
2022-08-18 22:46:51 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(cancel):464 - Cancelling source...
2022-08-18 22:46:51 e[32mINFOe[m i.a.w.i.DefaultAirbyteSource(cancel):142 - Attempting to cancel source process...
2022-08-18 22:46:51 e[32mINFOe[m i.a.w.i.DefaultAirbyteSource(cancel):147 - Source process exists, cancelling...
2022-08-18 22:46:51 e[32mINFOe[m i.a.w.i.DefaultAirbyteSource(cancel):149 - Cancelled source process!
2022-08-18 22:46:51 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(lambda$getCancellationChecker$3):195 - Interrupting worker thread...
2022-08-18 22:46:51 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(lambda$getCancellationChecker$3):198 - Cancelling completable future...
2022-08-18 22:46:51 e[33mWARNe[m i.a.w.t.CancellationHandler$TemporalCancellationHandler(checkAndHandleCancellation):53 - Job either timed out or was cancelled.
2022-08-18 22:46:51 e[33mWARNe[m i.a.w.t.CancellationHandler$TemporalCancellationHandler(checkAndHandleCancellation):53 - Job either timed out or was cancelled.
2022-08-18 22:46:51 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(get):131 - Stopping cancellation check scheduling...
2022-08-18 22:46:51 e[32mINFOe[m i.a.w.t.TemporalUtils(withBackgroundHeartbeat):312 - Stopping temporal heartbeating...
2022-08-18 22:46:51 e[33mWARNe[m i.t.i.a.POJOActivityTaskHandler(activityFailureToResult):307 - Activity failure. ActivityId=ad7b0d46-7c03-3f46-9f76-5403808c94d1, activityType=Replicate, attempt=1
java.lang.RuntimeException: java.util.concurrent.CancellationException
	at io.airbyte.workers.temporal.TemporalUtils.withBackgroundHeartbeat(TemporalUtils.java:310) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
	at io.airbyte.workers.temporal.sync.ReplicationActivityImpl.replicate(ReplicationActivityImpl.java:119) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
	at jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:578) ~[?:?]
	at io.temporal.internal.activity.POJOActivityTaskHandler$POJOActivityInboundCallsInterceptor.execute(POJOActivityTaskHandler.java:214) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.activity.POJOActivityTaskHandler$POJOActivityImplementation.execute(POJOActivityTaskHandler.java:180) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.activity.POJOActivityTaskHandler.handle(POJOActivityTaskHandler.java:120) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:204) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:164) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93) ~[temporal-sdk-1.8.1.jar:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) [?:?]
	at java.lang.Thread.run(Thread.java:1589) [?:?]
Caused by: java.util.concurrent.CancellationException
	at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2510) ~[?:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$3(TemporalAttemptExecution.java:201) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
	at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:52) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$4(TemporalAttemptExecution.java:204) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577) ~[?:?]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
	... 3 more
2022-08-18 22:46:51 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):180 - Source and destination threads complete.
2022-08-18 22:46:51 e[1;31mERRORe[m i.a.w.g.DefaultReplicationWorker(run):184 - Sync worker failed.
io.airbyte.workers.exception.WorkerException: Source process exit with code 143. This warning is normal if the job was cancelled.
	at io.airbyte.workers.internal.DefaultAirbyteSource.close(DefaultAirbyteSource.java:136) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
	at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:182) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
	at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:155) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
	at java.lang.Thread.run(Thread.java:1589) [?:?]
	Suppressed: java.io.IOException: Stream closed
		at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:445) ~[?:?]
		at java.io.OutputStream.write(OutputStream.java:164) ~[?:?]
		at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:125) ~[?:?]
		at java.io.BufferedOutputStream.implFlush(BufferedOutputStream.java:251) ~[?:?]
		at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:245) ~[?:?]
		at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:416) ~[?:?]
		at sun.nio.cs.StreamEncoder.lockedFlush(StreamEncoder.java:218) ~[?:?]
		at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:205) ~[?:?]
		at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:263) ~[?:?]
		at java.io.BufferedWriter.implFlush(BufferedWriter.java:372) ~[?:?]
		at java.io.BufferedWriter.flush(BufferedWriter.java:359) ~[?:?]
		at io.airbyte.workers.internal.DefaultAirbyteDestination.notifyEndOfInput(DefaultAirbyteDestination.java:98) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
		at io.airbyte.workers.internal.DefaultAirbyteDestination.close(DefaultAirbyteDestination.java:111) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
		at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:141) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
		at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
		at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:155) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
		at java.lang.Thread.run(Thread.java:1589) [?:?]
2022-08-18 22:46:51 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):243 - sync summary: io.airbyte.config.ReplicationAttemptSummary@70152c8[status=cancelled,recordsSynced=50104,bytesSynced=60489415,startTime=1660851761098,endTime=1660862811303,totalStats=io.airbyte.config.SyncStats@400233ba[recordsEmitted=50104,bytesEmitted=60489415,stateMessagesEmitted=392,recordsCommitted=0],streamStats=[io.airbyte.config.StreamSyncStats@3107c787[streamName=mailchimp_ctf_campaigns,stats=io.airbyte.config.SyncStats@50793eaa[recordsEmitted=3423,bytesEmitted=14822308,stateMessagesEmitted=<null>,recordsCommitted=<null>]], io.airbyte.config.StreamSyncStats@5d63a17f[streamName=mailchimp_ctf_email_activity,stats=io.airbyte.config.SyncStats@cadd68a[recordsEmitted=46681,bytesEmitted=45667107,stateMessagesEmitted=<null>,recordsCommitted=<null>]]]]
2022-08-18 22:46:51 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):270 - Source output at least one state message
2022-08-18 22:46:51 e[33mWARNe[m i.a.w.g.DefaultReplicationWorker(run):283 - State capture: No state retained.

logs-167.txt (168.9 KB)

I’m also seeing failures, though different errors, for a HubSpot connector as well, so can’t rule our that there’s some issue with larger accounts in general. but several other connectors are running fine and system resource utilization remains low.

Any ideas on what I can do to resolve this issue or minimize issues when syncing accounts with large histories in general?

Hey @NAjustin, sorry to hear you’re running into these errors - I’m looking into this and hope to have some ideas for you soon.

1 Like

@natalyjazzviolin Thanks! Looking forward to anything I can try . . . I’ve tried several accounts any anything with any significant amount of history seems to behave the same.

Hi! Sorry for the delay, the team was at off-site meetings last week. Could you please try to replicate just the email_events stream and share the sync logs? I think isolating that stream might help us see what’s going on here.