How to configure to have a larger number of concurrency?

  • Is this your first time deploying Airbyte?: No
  • OS Version / Instance: Linux
  • Memory / Disk: 20 m5.2xlarge machines
  • Deployment: Kubernetes
  • Airbyte Version: 0.38.1-alpha
  • Source name/version: airbyte/source-e2e-test / 2.1.0
  • Destination name/version: airbyte/destination-e2e-test / 0.2.2
  • Step: We use a script to create massive numbers of connections, where the source connector and destination connector are e2e-test. Each connection sync 1M logs.
  • Description:

We’re having trouble getting Airbyte up and running. We’re only seeing single digit concurrency no matter what we try and we’re told this should not be the case. here’s the info that might be most relevant. Can you please assist?

Things we’ve tried

  • deployed airbyte via helm chart and kustomize - no change

  • updating MAX_SYNC_WORKERS, SUBMITTER_NUM_THREADS, and the number of replicas of airbyte-workers

  • (lots of different combinations of increasing one, some or all of these)

  • replacing built-in postgres container with an overprovisioned amazon RDS

  • replacing builtin temporal with full cluster via temporal helm chart

  • replacing builtin minio with full cluster via minio helm chart

  • using s3 for log/state storage

How we’re measuring

  • checking both the number of actively running syncs, and the number of sysncs completed per hour as emitted by the airbyte-metrics-reporter to datadog

related thread:

What number you used for MAX_SYNC_WORKERS, SUBMITTER_NUM_THREADS? changing those should impact the concurrency level to what you expect. Your cluster is 20 x m5.2xlarge machines OR do you have 20Gb only?

Hi, we followed the recommendation for our variables to have 100 parallel syncs

  • Number of Workers: 3
  • SUBMITTER_NUM_THREADS = 100
  • MAX_SYNC_WORKERS = 100

Our cluster is 20 x m5.2xlarge machines

Sorry @yli I was wrong in the suggestion:

  • Number of Workers: 3
  • SUBMITTER_NUM_THREADS = 200
  • MAX_SYNC_WORKERS = 100
    You must double the threads compared to workers, because each connection will consume a new thread.

Hi Marcos,

We have those environment variables set but it doesn’t seem to be working. Is there any other reason we might be running into this problem?

Here is a screenshot of our metrics in datadog. The first graph is the num_running_jobs and the num_pending_jobs. You can see that that with ~100 syncs we are running no more than 4 jobs at a time. This number does not change no matter what we set the variables to.

Did you stop and start Airbyte after modify the .venv? because this looks as the default values are applied and not the ones you shared with me.

Yes, I believe they should be applied because I can see the new worker replicas in the kubernetes dashboard. But I can double check.

One thing is, even if we manage to bring the number of jobs up, they do not seem to process. As shown here we managed to bring the number up to ~100 by creating more hourly syncs, but the pending jobs don’t seem to come down. Do you know what might be the reason for this?

@marcosmarxm
There were no more than 4 jobs at a time because the jobs are done quickly, the metric collected doesn’t reflect the actual concurrency. Later, we scheduled more long-running jobs, we are able to hit the concurrency of 100.

Just to introduce some backgroud. There are in total 10 m5.x2large machines in the cluster. All the connections are “e2e <-> e2e”, and we can configure the size of logs between the source and the destination. The small job syncs 5K logs, which is completed in 13 seconds, and the large job sync 1M logs, which is completed in 1 minute. All jobs are scheduled to run hourly.

Before 8PM, there are about 120 large connections, and 300 small connections, it works okay, the number of failed jobs is about 100. But failed jobs will be completed in the next hour. Around 8PM, I added another 170 small jobs, and the jobs succeeded suddenly dropped to 0.


(number of total connections - time series)


(number of jobs per each status - time series)



Nodes Utilization graph

Given 10 m5.x2large machines, can you please advise how I could configure the environment variables (worker replica, max_sync_worker) so that the cluster is able to complete 800 small jobs and 200 large jobs hourly?

Given 10 m5.x2large machines, can you please advice how I could configure the environment variables so that the cluster is able to complete 800 small jobs and 200 large jobs hourly?

Sorry @yli is it not possible for me to inform you this because the concept of small and big jobs is not clear. You can try experiment changing the variables in the env file to achieve the best result for your use case. Btw, can you upgrade to latest version 0.39.0? This uses the new scheduler with temporal and can achieve better scalability of jobs too.

Hi @yli, Airbyte engineer here. Can we start by upgrading to 0.39 and above? 0.39 removes our old scheduler and introduces the Temporal based scheduler we use internally for all open source users. This should have much better concurrency compared to our old scheduler right off the bat and remove the configuration need using the env vars.

Are those metrics coming from our metrics collector app?

Curious, what is your RDS provisioned at today?

1 Like

Hi, thank you for the reply! We upgraded to 0.39 and it made a huge difference and we are now able to run as many jobs as syncs concurrently. We are using the metrics coming form metrics collector app and sending it to our internal datadog. Our RDS is m5.xlarge, postgres 14.1.

I have a couple questions

  1. Environment variables

This should have much better concurrency compared to our old scheduler right off the bat and remove the configuration need using the env vars.

Do you mean we don’t need to change these numbers anymore: MAX_SYNC_WORKERS, SUBMITTER_NUM_THREADS, TEMPORAL_WORKER_PORTS and the number of replicas of airbyte-workers? What is your recommendation for these values? For reference I am testing ~2000 hourly E2E connections right now, ~1600 have 5000-10000 records, ~400 have 1M records.

  1. Also we are hitting this error after the syncs run for ~half day
Publishing to S3 (bucket=airbyte-dev-logs; key=job-logging/workspace/3866/2/logs.log/20220524042019_airbyte-worker-55cfcf6765-m7rrt_499dfdbc2931438395f385097921d8ac):
java.lang.RuntimeException: Cannot publish to S3: Storage backend has reached its minimum free disk threshold. Please delete a few objects to proceed. (Service: Amazon S3; Status Code: 507; Error Code: XMinioStorageFull; Request ID: 16F1EFA2769FE51B; S3 Extended Request ID: c78d39ee-1d33-4111-b3ee-79f47357e30b; Proxy: null)

I assume moving to custom minio location will solve this issue?

  1. I am using the stable deployment overlay airbyte/kube/overlays/stable at master · airbytehq/airbyte · GitHub and we noticed the CPU utilization for one node is always super high. Do you have example of how to make it more equal? Perhaps using these configurations? Assigning Pods to Nodes | Kubernetes

  1. SUBMITTER_NUM_THREADS is deprecated now with new scheduler. @davin can you help with the calculation of parallel syncs? Should be something like: MAX_SYNC_WORKERS * (number of airbyte-worker replicas) correct?
  2. about minio: yes move to custom minio or a cloud storage bucket like s3 or gcs will give you the scalability you need.
  3. what containers are in this node? are you trying to have one airbyte-worker in each node?

Another issue we have is that we get hanging jobs after the syncs for run a few hours. These jobs never resolve. It seems like we hit a resource limit but I’m not sure what it is.

I checked some of the logs and it is a timeout

2022-05-24 23:54:50 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(get):110 - Cloud storage job log path: /workspace/4615/0/logs.log
2022-05-24 23:54:50 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(get):113 - Executing worker wrapper. Airbyte version: 0.36.6-alpha
2022-05-24 23:54:50 e[32mINFOe[m i.a.w.p.KubeProcessFactory(create):100 - Attempting to start pod = source-e2e-test-check-4615-0-dupoz for airbyte/source-e2e-test:2.1.0
2022-05-25 00:00:00 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(lambda$getCancellationChecker$3):194 - Running sync worker cancellation...
2022-05-25 00:00:00 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(lambda$getCancellationChecker$3):198 - Interrupting worker thread...
2022-05-25 00:00:00 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(lambda$getCancellationChecker$3):201 - Cancelling completable future...
2022-05-25 00:00:00 e[33mWARNe[m i.a.w.t.CancellationHandler$TemporalCancellationHandler(checkAndHandleCancellation):53 - Job either timed out or was cancelled.
2022-05-25 00:00:00 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(get):134 - Stopping cancellation check scheduling...
2022-05-25 00:00:00 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(lambda$getWorkerThread$2):161 - Completing future exceptionally...
io.airbyte.workers.exception.WorkerException: Error while getting checking connection.
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:86) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:29) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: io.airbyte.workers.exception.WorkerException
	at io.airbyte.workers.process.KubeProcessFactory.create(KubeProcessFactory.java:138) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.process.AirbyteIntegrationLauncher.check(AirbyteIntegrationLauncher.java:80) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:55) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	... 3 more
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1679) ~[?:?]
	at java.util.concurrent.LinkedBlockingDeque.pollFirst(LinkedBlockingDeque.java:515) ~[?:?]
	at java.util.concurrent.LinkedBlockingDeque.poll(LinkedBlockingDeque.java:677) ~[?:?]
	at io.airbyte.workers.process.KubePortManagerSingleton.take(KubePortManagerSingleton.java:67) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.process.KubeProcessFactory.create(KubeProcessFactory.java:102) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.process.AirbyteIntegrationLauncher.check(AirbyteIntegrationLauncher.java:80) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:55) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	... 3 more
2022-05-25 00:00:00 e[33mWARNe[m i.t.i.a.POJOActivityTaskHandler(activityFailureToResult):307 - Activity failure. ActivityId=f90c698b-d3b6-33d0-ae49-f21f0e06487e, activityType=Run, attempt=1
java.util.concurrent.CancellationException: null
	at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2478) ~[?:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$3(TemporalAttemptExecution.java:204) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:52) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$4(TemporalAttemptExecution.java:207) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
	at java.lang.Thread.run(Thread.java:833) [?:?]

Any idea what could be happening?

@davinchia @marcosmarxm

The temporal scheduler makes a difference, the good part is that more jobs are running concurrently, and there are barely pending jobs. The downside side is that the running jobs stay in running for a long time, and the auto-scaling starts to turn off nodes.

I have only one workspace, so, the following graph refers to all connections in the whole airbyte namespace. A script is run to create connections minute by minute, each connection is a source-e2e-test <> destination-e2e-test. Among the connections, 95% of them are small, which generates 5K logs, and 5% are large, which generates 1M logs.

The following metric comes from metrics_reporter.overall_job_runtime_in_last_hour_by_terminal_state_secs filtered by status of succeeded. According to the source code, only jobs completed within 1 hour are included. There are no failed jobs.

The data looks good when the size is smaller than 4K. Jobs are getting stuck at the running state since 20:30 when there are more than 4K connections.


Compared to the old scheduler (graphs attached in previous comments), almost no jobs are placed in the pending queue.

And if you look at the eks cluster, when the number of running jobs stay at 5K, the node size gets smaller

and the CPU utilization is smaller

You may wonder what the job is busy with, I downloaded a job’s log, and I will include some of them here. The log out is the same: Setting kube job mdc for hours. And the sync technically is done, but job is stuck in running state and doesn’t close.

2022-05-25 02:57:09 e[32mINFOe[m i.a.w.p.KubeProcessFactory(create):100 - Attempting to start pod = destination-e2e-test-check-3337-0-dphwz for airbyte/destination-e2e-test:0.2.2
2022-05-25 02:57:09 e[32mINFOe[m i.a.w.p.KubeProcessFactory(create):103 - destination-e2e-test-check-3337-0-dphwz stdoutLocalPort = 9237
2022-05-25 02:57:09 e[32mINFOe[m i.a.w.p.KubeProcessFactory(create):106 - destination-e2e-test-check-3337-0-dphwz stderrLocalPort = 9238
2022-05-25 02:57:09 e[32mINFOe[m i.a.w.p.KubePodProcess(lambda$setupStdOutAndStdErrListeners$9):583 - Creating stdout socket server...
2022-05-25 02:57:09 e[32mINFOe[m i.a.w.p.KubePodProcess(lambda$setupStdOutAndStdErrListeners$10):601 - Creating stderr socket server...
2022-05-25 02:57:09 e[32mINFOe[m i.a.w.p.KubePodProcess(<init>):515 - Creating pod destination-e2e-test-check-3337-0-dphwz
...
2022-05-25 02:57:54 e[32mINFOe[m i.a.w.p.KubePodProcess(<init>):573 - Using null stdin output stream...
2022-05-25 02:57:54 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):166 - Waiting for source and destination threads to complete.
2022-05-25 02:57:54 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):295 - Replication thread started.
2022-05-25 02:57:54 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$7):385 - Destination output thread started.
2022-05-25 02:57:55 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):334 - Records read: 1000 (20 KB)
2022-05-25 02:57:57 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):334 - Records read: 2000 (41 KB)
...
2022-05-25 02:57:55 e[43mdestinatione[0m > 2022-05-25 02:57:55 e[32mINFOe[m i.a.i.d.e.l.FirstNLogger(log):27 - [2022-05-25T02:57:54.306Z] data_stream #0997: {"column1":"gnmcc"}
2022-05-25 02:57:55 e[43mdestinatione[0m > 2022-05-25 02:57:55 e[32mINFOe[m i.a.i.d.e.l.FirstNLogger(log):27 - [2022-05-25T02:57:54.306Z] data_stream #0998: {"column1":"yc"}
2022-05-25 02:57:55 e[43mdestinatione[0m > 2022-05-25 02:57:55 e[32mINFOe[m i.a.i.d.e.l.FirstNLogger(log):27 - [2022-05-25T02:57:54.306Z] data_stream #0999: {"column1":"yllq"}
2022-05-25 02:57:55 e[43mdestinatione[0m > 2022-05-25 02:57:55 e[32mINFOe[m i.a.i.d.e.l.FirstNLogger(log):27 - [2022-05-25T02:57:54.306Z] data_stream #1000: {"column1":"m"}
2022-05-25 02:58:49 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):334 - Records read: 46000 (944 KB)
2022-05-25 02:58:50 e[36mDEBUGe[m i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
2022-05-25 02:58:50 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):334 - Records read: 47000 (964 KB)
2022-05-25 02:58:51 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):334 - Records read: 48000 (985 KB)
...
2022-05-25 03:54:30 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):334 - Records read: 999000 (20 MB)
2022-05-25 03:54:33 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):334 - Records read: 1000000 (20 MB)
2022-05-25 03:54:33 e[36mDEBUGe[m i.f.k.c.i.c.Reflector(stopWatcher):64 - Stopping watcher for resource class io.fabric8.kubernetes.api.model.Pod v2464230 in namespace airbyte
2022-05-25 03:54:33 e[36mDEBUGe[m i.f.k.c.d.i.AbstractWatchManager(close):230 - Force closing the watch io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@7f4f64aa
2022-05-25 03:54:33 e[36mDEBUGe[m i.f.k.c.i.c.Reflector$ReflectorWatcher(onClose):181 - Watch gracefully closed
022-05-25 03:54:33 e[36mDEBUGe[m i.f.k.c.d.i.WatchConnectionManager(closeWebSocket):60 - Closing websocket io.fabric8.kubernetes.client.okhttp.OkHttpWebSocketImpl@434f7eea
2022-05-25 03:54:33 e[36mDEBUGe[m i.f.k.c.d.i.WatchConnectionManager(closeWebSocket):63 - Websocket already closed io.fabric8.kubernetes.client.okhttp.OkHttpWebSocketImpl@434f7eea
2022-05-25 03:54:33 e[36mDEBUGe[m i.f.k.c.d.i.WatchConnectionManager(closeWebSocket):60 - Closing websocket io.fabric8.kubernetes.client.okhttp.OkHttpWebSocketImpl@6d40e083
2022-05-25 03:54:33 e[32mINFOe[m i.a.w.p.KubePodProcess(close):713 - (pod: airbyte / source-e2e-test-read-3337-0-ucewu) - Closed all resources for pod
2022-05-25 03:54:33 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):345 - Total records read: 1000000 (20 MB)
2022-05-25 03:54:33 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):171 - One of source or destination thread complete. Waiting on the other.
2022-05-25 03:54:40 e[36mDEBUGe[m i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
2022-05-25 03:54:34 e[43mdestinatione[0m > 2022-05-25 03:54:34 e[32mINFOe[m i.a.i.b.IntegrationRunner(runInternal):153 - Completed integration: io.airbyte.integrations.destination.e2e_test.TestingDestinations
2022-05-25 03:54:34 e[43mdestinatione[0m > 2022-05-25 03:54:34 e[32mINFOe[m i.a.i.d.e.TestingDestinations(main):73 - completed destination: class io.airbyte.integrations.destination.e2e_test.TestingDestinations
2022-05-25 03:54:49 e[36mDEBUGe[m i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
2022-05-25 03:55:00 e[36mDEBUGe[m i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
...
2022-05-25 05:41:39 e[36mDEBUGe[m i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc

Here’s a few environment variables for the airbyte:

  worker_replica_count     = 12
  max_sync_workers         = 400
  max_spec_workers         = 100
  max_check_workers        = 100
  max_discover_workers     = 100

And we set resources.limit and resources.request on scheduler, worker and jobs.

variable "cpu_request" {
  default = "500m"
}
variable "cpu_limit" {
  default = "1000m"
}
variable "memory_request" {
  default = "2Gi"
}
variable "memory_limit" {
  default = "4Gi"
}

The temporal_worker_ports is [9001, 10000], in total 1000 ports.

For the EKS cluster, here’s the info.

      node_group_name = "m5-2xl"
      instance_types  = ["m5.2xlarge"]
      ami_type        = "BOTTLEROCKET_x86_64"
      platform        = "bottlerocket"
      min_size        = "10"
      desired_size    = "10"
      max_size        = "30"

Can you help me? How can I adjust the setting so that jobs are completed gracefully? Since job_submitter_threads is deprecated, is there an environment variable to restrain the maximum jobs being run? In our case, a small job is completed in 30 seconds, and a large job is completed in 90 seconds, it seems that 800 is a reasonable number. However, the new scheduler pushes it to 5K when the size of the connection reaches 4K.

@marcosmarxm

  1. SUBMITTER_NUM_THREADS is deprecated now with new scheduler. @davin can you help with the calculation of parallel syncs? Should be something like: MAX_SYNC_WORKERS * (number of airbyte-worker replicas) correct?

That’s correct if the deployment is using the new Container Orchestrator (separate from the new Temporal Scheduler), as any ports used are sandboxed per job.

If the deployment is not using the Container Orchestrator, ports are not sandboxed and the number of ports available per worker node (set via TEMPORAL_WORKER_PORTS env var) also affects the total number of running jobs. The formula would then be max parallel syncs = number of worker pods * min(MAX_SYNC_WORKERS, TEMPORAL_WORKER_PORTS/4). We divide by 4 since each sync uses 4 ports on the worker pod.

@stellachung
From:

Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1679) ~[?:?]
	at java.util.concurrent.LinkedBlockingDeque.pollFirst(LinkedBlockingDeque.java:515) ~[?:?]
	at java.util.concurrent.LinkedBlockingDeque.poll(LinkedBlockingDeque.java:677) ~[?:?]
	at io.airbyte.workers.process.KubePortManagerSingleton.take(KubePortManagerSingleton.java:67) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]

My guess is this is timing out while waiting for a port to be allocated since all the available ports are currently in use. I would either increase the number of available ports via the TEMPORAL_WORKER_PORTS env var, or turn on the Container Orchestrator (this is a lot more infrastructure and not yet well documented). I would recommend not using the Orchestrator for now and increasing the ports.

@yli
Do you see any logs in the stuck jobs with the LOGGER.info("Source and destination threads complete.");. This is from here and shows the job is completely done. Otherwise it seems like something was automatically terminated while only one part of the sync job was complete.

Several thoughts from me:

  • I would make sure there are enough worker ports available. Although each worker pod is configured for 400 workers, there are only enough ports available for 1000/4 = 250 sync jobs. I wrote out the formula in my reply to @stellachung.
  • How does CPU/RAM look? We usually run 10 sync workers per worker pod with 1 CPU and 3GB ram. We have not tested running so many workers on a single worker pod. It is a real possibility that each worker is being cpu-rate-limited and this is pushing the program into a weird state e.g. timeouts, connection dropping etc. I would recommend spreading the load over more worker pods here and adjusting resources accordingly.
  • Taking a step back, what is the scaling goal here? Is there a concrete number we are trying to get to? If there is a concrete goal, we can work backwards and configure things accordingly. Or is this more of a case of pushing the system to it’s limits?

By the way, we’ve only tested Airbyte Kubernetes up to 1000 concurrent jobs, so it’s very cool to see the system being pushed to it’s limits and performing decent!

1 Like

Sorry if there was some notification noise. I replied individually and then realised I should combine my answer for easier reading.

TLDR, the goal is to fulfill 6K e2e jobs hourly and 5% of them are large loads.

The long story is the following if you want to know more context.
One of the features at Heap is to help customers aggregate data from different SaaS apps (Hubspot, salesforce, Marketo…). We leveraged the Faros CDK to develop the heap destination connector in TypeScript.

The existing solution is able to fulfill 2000 jobs hourly. It’s an ETL job because each job transforms data before they are being loaded. Since customers ask for more data sources, we need to lower the development cost to onboard a new data source – and that’s how we found Airbyte. Since extracting data takes a much longer time compared to the source-e2e-test, we set a goal of 6K jobs to support existing data sources.

To calculate it backward, I need on average 1000 syncs completed every minute. I will use 1K ports and use the following combination today

  worker_replica_count     = 5
  max_sync_workers         = 250
  max_spec_workers         = 50
  max_check_workers        = 50
  max_discover_workers     = 50

According to your 1-million-worth formula: parallel syncs = number of worker pods * min( MAX_SYNC_WORKERS , TEMPORAL_WORKER_PORTS /4)

so, it would be 5 * min( 250, 1000 / 4) = 1250

Let me know your thoughts.

@yli Thanks for the context, that is helpful. Do you mean 100 syncs completed per minute? That is, 6000 per hour / 60 mins = 100 jobs per min. Am I misunderstanding?

I realised I misspoke earlier on the Kube ports. It looks like the from 0.39.0-alpha onwards, we turned on the container-orchestrator on by default for Kube, so you should be on the container orchestrator now and the formula is simplified to: parallel syncs = number of worker pods * MAX_SYNC_WORKERS and ports should no longer be a concern. Sorry about that. This explains why your Airbyte instance was able to run close to 5k jobs (12 pods * 400 workers each) even though there weren’t enough available ports. This was a recent change and I will update the documentation in the next few days.

More thoughts here:

  • There is some overhead in setting up a job due to the Kube API latency. In my experience, we see about 1 - 2 mins overhead in creating all the pods involved in the job. This means that the true job execution time = time to sync data + 1 - 2 min of overhead. Keeping this in mind, what happens if we try with 15 workers each with 250 max_sync_workers?
  • Is job load spiky or constant? This will affect resourcing.
  • Sounds like the jobs are being put into running and not being completed? There should be a job_succeeded_by_release_stage metric emitted from the worker, can you set PUBLISH_METRICS = true and see if anything is being moved into succeeded? This should give you a whole bunch of DD metrics as shown in the screen shot that will help us know what is going on.
  • How does the database resource usage look during this time?


Through a job’s destination log, I learned that data are loaded, however, it stayed at the Setting kube job mdc for hours, and the number of active running jobs starts to accumulate. The jobs are created at a constant pace.
The above is the screenshot, looks like no jobs succeeded between 22:00 and 22:30. I am going to remove worker_ports, and try with 15 workers and 250 max_sync_workers.

Since I deprovisioned the database and the cluster last night, I don’t have the metric for the database. But I remembered if I count jobs group by status, the result takes less than a second. Another unexplained metric is the node size and the CPU utilization on each node, the whole cluster cools down though thousands of jobs are stuck at running.