How to configure to have a larger number of concurrency?

@davinchia Hi, thanks for your help. I ran another experiment, and the running jobs stuck at the check step, and started to accumulate. It starts to happen when the size reaches 4K. Do you know the reason?

2022-05-26 11:16:30 DEBUG i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
2022-05-26 11:16:30 INFO i.a.w.t.TemporalAttemptExecution(get):110 - Cloud storage job log path: /workspace/20609/0/logs.log
2022-05-26 11:16:30 INFO i.a.w.t.TemporalAttemptExecution(get):113 - Executing worker wrapper. Airbyte version: 0.39.0-alpha
2022-05-26 11:16:30 DEBUG i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
2022-05-26 11:16:30 INFO i.a.w.p.KubeProcessFactory(create):100 - Attempting to start pod = source-e2e-test-check-20609-0-fdoad for airbyte/source-e2e-test:2.1.0
2022-05-26 11:16:40 DEBUG i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
2022-05-26 11:16:50 DEBUG i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
2022-05-26 11:17:00 DEBUG i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
...
022-05-26 11:21:10 DEBUG i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
2022-05-26 11:21:20 DEBUG i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
2022-05-26 11:21:30 DEBUG i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
2022-05-26 11:21:40 DEBUG i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
2022-05-26 11:21:40 INFO i.a.w.t.TemporalAttemptExecution(lambda$getCancellationChecker$3):194 - Running sync worker cancellation...
2022-05-26 11:21:40 INFO i.a.w.t.TemporalAttemptExecution(lambda$getCancellationChecker$3):198 - Interrupting worker thread...
2022-05-26 11:21:40 INFO i.a.w.t.TemporalAttemptExecution(lambda$getCancellationChecker$3):201 - Cancelling completable future...
2022-05-26 11:21:40 WARN i.a.w.t.CancellationHandler$TemporalCancellationHandler(checkAndHandleCancellation):53 - Job either timed out or was cancelled.
2022-05-26 11:21:40 INFO i.a.w.t.TemporalAttemptExecution(lambda$getWorkerThread$2):161 - Completing future exceptionally...
io.airbyte.workers.exception.WorkerException: Error while getting checking connection.
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:86) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:29) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: io.airbyte.workers.exception.WorkerException
	at io.airbyte.workers.process.KubeProcessFactory.create(KubeProcessFactory.java:138) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.process.AirbyteIntegrationLauncher.check(AirbyteIntegrationLauncher.java:80) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:55) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]

And here’s the variable I use:

  worker_replica_count     = 15
  max_sync_workers         = 250
  max_spec_workers         = 50
  max_check_workers        = 50
  max_discover_workers     = 50

And the following is the resource config for workers, scheduler and jobs.

variable "cpu_request" {
  default = "500m"
}
variable "cpu_limit" {
  default = "1000m"
}
variable "memory_request" {
  default = "2Gi"
}
variable "memory_limit" {
  default = "4Gi"
}

More metrics





Do you think it would get better if I add more workers to check?

@davinchia can you help confirm the feature flag CONTAINER_ORCHESTRATOR_ENABLED is GAed or applied to KubePortManagerSingleton? I saw that OrchestratorConstants.PORTS (9877, 9878, 9879, 9880) are assigned to KubePortManagerSingleton initially, and later these ports will be recycled.

And one of the jobs is stuck at initializing a KubePodProcess because ports are used. And the port is not among the 4 above. Regardless, even if the WorkerException is thrown, temporal should have retried the (check) work.

validation, just use a NonValidationKeyword
2022-05-26 20:59:15 e[36mDEBUGe[m i.a.w.WorkerUtils(gentleClose):36 - Gently closing process source-e2e-test-check-2713-0-iodmz
2022-05-26 20:59:16 e[36mDEBUGe[m i.f.k.c.i.c.Reflector(stopWatcher):64 - Stopping watcher for resource class io.fabric8.kubernetes.api.model.Pod v3264980 in namespace airbyte
2022-05-26 20:59:16 e[36mDEBUGe[m i.f.k.c.d.i.AbstractWatchManager(close):230 - Force closing the watch io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@7e35cf1e
2022-05-26 20:59:16 e[36mDEBUGe[m i.f.k.c.i.c.Reflector$ReflectorWatcher(onClose):181 - Watch gracefully closed
2022-05-26 20:59:16 e[36mDEBUGe[m i.f.k.c.d.i.WatchConnectionManager(closeWebSocket):60 - Closing websocket io.fabric8.kubernetes.client.okhttp.OkHttpWebSocketImpl@69b2331
2022-05-26 20:59:16 e[36mDEBUGe[m i.f.k.c.d.i.WatchConnectionManager(closeWebSocket):60 - Closing websocket io.fabric8.kubernetes.client.okhttp.OkHttpWebSocketImpl@69b2331
2022-05-26 20:59:16 e[36mDEBUGe[m i.f.k.c.d.i.WatchConnectionManager(closeWebSocket):63 - Websocket already closed io.fabric8.kubernetes.client.okhttp.OkHttpWebSocketImpl@69b2331
2022-05-26 20:59:16 e[32mINFOe[m i.a.w.p.KubePodProcess(close):713 - (pod: airbyte / source-e2e-test-check-2713-0-iodmz) - Closed all resources for pod
2022-05-26 20:59:16 e[36mDEBUGe[m i.a.w.g.DefaultCheckConnectionWorker(run):78 - Check connection job subprocess finished with exit code 0
2022-05-26 20:59:16 e[36mDEBUGe[m i.a.w.g.DefaultCheckConnectionWorker(run):79 - Check connection job received output: io.airbyte.config.StandardCheckConnectionOutput@7b026a[status=succeeded,message=Source config: ContinuousFeedConfig{maxMessages=5000, seed=0, messageIntervalMs=Optional.empty, mockCatalog=io.airbyte.protocol.models.AirbyteCatalog@4f8caaf3[streams=[io.airbyte.protocol.models.AirbyteStream@2b50150[name=data_stream,jsonSchema={"type":"object","properties":{"column1":{"type":"string"}}},supportedSyncModes=[],sourceDefinedCursor=<null>,defaultCursorField=[],sourceDefinedPrimaryKey=[],namespace=<null>,additionalProperties={}]],additionalProperties={}]}]
2022-05-26 20:59:16 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(get):134 - Stopping cancellation check scheduling...
2022-05-26 20:59:17 e[36mDEBUGe[m i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
2022-05-26 20:59:17 e[32mINFOe[m i.a.w.p.KubeProcessFactory(create):100 - Attempting to start pod = destination-e2e-test-check-2713-0-uascb for airbyte/destination-e2e-test:0.2.2
2022-05-26 20:59:22 e[32mINFOe[m i.a.w.p.KubeProcessFactory(create):103 - destination-e2e-test-check-2713-0-uascb stdoutLocalPort = 9030
2022-05-26 20:59:26 e[32mINFOe[m i.a.w.p.KubeProcessFactory(create):106 - destination-e2e-test-check-2713-0-uascb stderrLocalPort = 9018
2022-05-26 20:59:26 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(lambda$getWorkerThread$2):161 - Completing future exceptionally...
io.airbyte.workers.exception.WorkerException: Error while getting checking connection.
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:86) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:29) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: io.airbyte.workers.exception.WorkerException: Address already in use
	at io.airbyte.workers.process.KubeProcessFactory.create(KubeProcessFactory.java:138) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.process.AirbyteIntegrationLauncher.check(AirbyteIntegrationLauncher.java:80) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:55) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	... 3 more
Caused by: java.net.BindException: Address already in use
	at sun.nio.ch.Net.bind0(Native Method) ~[?:?]
	at sun.nio.ch.Net.bind(Net.java:555) ~[?:?]
	at sun.nio.ch.Net.bind(Net.java:544) ~[?:?]
	at sun.nio.ch.NioSocketImpl.bind(NioSocketImpl.java:643) ~[?:?]
	at java.net.ServerSocket.bind(ServerSocket.java:388) ~[?:?]
	at java.net.ServerSocket.<init>(ServerSocket.java:274) ~[?:?]
	at java.net.ServerSocket.<init>(ServerSocket.java:167) ~[?:?]
	at io.airbyte.workers.process.KubePodProcess.<init>(KubePodProcess.java:376) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.process.KubeProcessFactory.create(KubeProcessFactory.java:134) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.process.AirbyteIntegrationLauncher.check(AirbyteIntegrationLauncher.java:80) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:55) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	... 3 more
2022-05-26 20:59:26 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(get):134 - Stopping cancellation check scheduling...
2022-05-26 20:59:26 e[33mWARNe[m i.t.i.a.POJOActivityTaskHandler(activityFailureToResult):307 - Activity failure. ActivityId=66b08428-0dad-3147-9603-7c5b3619225c, activityType=Run, attempt=1
java.util.concurrent.ExecutionException: io.airbyte.workers.exception.WorkerException: Error while getting checking connection.
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[?:?]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.get(TemporalAttemptExecution.java:132) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl.run(CheckConnectionActivityImpl.java:76) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at jdk.internal.reflect.GeneratedMethodAccessor549.invoke(Unknown Source) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
	at io.temporal.internal.activity.POJOActivityTaskHandler$POJOActivityInboundCallsInterceptor.execute(POJOActivityTaskHandler.java:214) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.activity.POJOActivityTaskHandler$POJOActivityImplementation.execute(POJOActivityTaskHandler.java:180) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.activity.POJOActivityTaskHandler.handle(POJOActivityTaskHandler.java:120) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:204) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:164) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93) ~[temporal-sdk-1.8.1.jar:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
	at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: io.airbyte.workers.exception.WorkerException: Error while getting checking connection.
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:86) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:29) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	... 1 more
Caused by: io.airbyte.workers.exception.WorkerException: Address already in use
	at io.airbyte.workers.process.KubeProcessFactory.create(KubeProcessFactory.java:138) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.process.AirbyteIntegrationLauncher.check(AirbyteIntegrationLauncher.java:80) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:55) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:29) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	... 1 more
Caused by: java.net.BindException: Address already in use
	at sun.nio.ch.Net.bind0(Native Method) ~[?:?]
	at sun.nio.ch.Net.bind(Net.java:555) ~[?:?]
	at sun.nio.ch.Net.bind(Net.java:544) ~[?:?]
	at sun.nio.ch.NioSocketImpl.bind(NioSocketImpl.java:643) ~[?:?]
	at java.net.ServerSocket.bind(ServerSocket.java:388) ~[?:?]
	at java.net.ServerSocket.<init>(ServerSocket.java:274) ~[?:?]
	at java.net.ServerSocket.<init>(ServerSocket.java:167) ~[?:?]
	at io.airbyte.workers.process.KubePodProcess.<init>(KubePodProcess.java:376) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.process.KubeProcessFactory.create(KubeProcessFactory.java:134) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.process.AirbyteIntegrationLauncher.check(AirbyteIntegrationLauncher.java:80) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:55) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:29) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.0-alpha.jar:?]

@yli
Your error message does make it seem like the jobs are running out of ports. How are you deploying Airbyte to the Kube cluster?

I can confirm the CONTAINER_ORCHESTRATOR env var is turned on on our Kustomize deploys since 0.39.0-alpha. So if you are using these charts the orchestrator should be turned on.

I can’t think of any reason off the top of my head why the rate of job completion would slow down as we see here:

The scaling dimensions are:

  1. Temporal - what is Temporal’s resource usage? This can manifest as database load on the Temporal tables.
  2. The number of available ports. Does not matter if the Orchestrator is used.
  3. The number of pods the Kube cluster can support - having 4k running jobs means there are 8k running pods. Unlikely but maybe we are running into Kube overhead here?
  4. The number of available workers. One thing to note is that max_check_workers should match max_sync_workers since a sync job will perform a check connection before syncing as a sanity check the sync is able to proceed.

The main things we ran into during our scale test:

  • DB size and connection limits.
  • kube node pool limitations
  • number of available workers.

I’m curious if trying with more workers changes anything. Maybe try with double the workers? I would monitor the Temporal pod and the database resource usage closely during this time.

@davinchia it’s been a while since our last conversation. CONTAINER_ORCHESTRATOR works, and I saw the code that the temporal scheduler is turned on by default. In our cluster, the maximum connections I could achieve is 3600. Jobs start to fail after 3.6K, and I am able to reproduce it in the airbyte cloud.

https://cloud.airbyte.io/workspaces/e20f99b5-a18c-41d8-adc8-2526fe08af7f/connections/e32725de-eb48-45f9-ab58-6aa058d82143/status

I got the exact same log message. And the job hang at running, it doesn’t fail and consequently, retry is not kicked off.

	at io.airbyte.workers.process.KubeProcessFactory.create(KubeProcessFactory.java:102) ~[io.airbyte-airbyte-workers-0.39.16-alpha.jar:?]
	at io.airbyte.workers.process.AirbyteIntegrationLauncher.check(AirbyteIntegrationLauncher.java:80) ~[io.airbyte-airbyte-workers-0.39.16-alpha.jar:?]
	at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:55) ~[io.airbyte-airbyte-workers-0.39.16-alpha.jar:?]
	... 3 more
2022-06-13 19:28:18 INFO i.a.w.t.TemporalAttemptExecution(get):134 - Stopping cancellation check scheduling...
2022-06-13 19:28:18 WARN i.t.i.a.POJOActivityTaskHandler(activityFailureToResult):307 - Activity failure. ActivityId=e0439e52-3d6f-3949-afe6-d3017ad93973, activityType=Run, attempt=1
java.util.concurrent.CancellationException: null
	at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2478) ~[?:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$3(TemporalAttemptExecution.java:204) ~[io.airbyte-airbyte-workers-0.39.16-alpha.jar:?]
	at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:52) ~[io.airbyte-airbyte-workers-0.39.16-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$4(TemporalAttemptExecution.java:207) ~[io.airbyte-airbyte-workers-0.39.16-alpha.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
	at java.lang.Thread.run(Thread.java:833) [?:?]
2022-06-13 19:28:18 INFO i.a.s.p.DefaultJobPersistence(enqueueJob):139 - enqueuing pending job for scope: baa5c76a-0bf5-4ac8-9326-ae8c729c9529

Since the cap is 3.6K, I changed the size to 3K, and let it run for days. All of the connections are e2e <-> e2e connections. Really appreciate your help. Now I start to check if the bottleneck on my side is temporal.

We are using temporalio/auto-setup:1.7.0, and I read the documentation, it’s not recommended for production. And the replica set is 1/1. I wonder if it’s better to deploy a full suite of temporal to a new namespace. At least, the pod temporal admin tools would be useful for debugging.

And after running for a few days, I got the following error from that single and only temporal pod.

{"level":"info","ts":"2022-06-13T12:00:00.266Z","msg":"Taskqueue scavenger starting","service":"worker","logging-call-at":"scavenger.go:117"}
{"level":"info","ts":"2022-06-13T12:00:00.266Z","msg":"Taskqueue scavenger started","service":"worker","logging-call-at":"scavenger.go:122"}
{"level":"info","ts":"2022-06-13T12:00:00.303Z","msg":"scavenger.deleteHandler processed.","service":"worker","wf-namespace-id":"4b3cdd99-2db4-43cb-ae8a-4dcda771b92e","wf-task-queue-name":"/_sys/CONNECTION_UPDATER/2","wf-task-queue-type":"Workflow","number-processed":1,"number-deleted":0,"logging-call-at":"handler.go:141"}
{"level":"info","ts":"2022-06-13T12:00:00.303Z","msg":"scavenger.deleteHandler processed.","service":"worker","wf-namespace-id":"4b3cdd99-2db4-43cb-ae8a-4dcda771b92e","wf-task-queue-name":"/_sys/CONNECTION_UPDATER/1","wf-task-queue-type":"Workflow","number-processed":1,"number-deleted":0,"logging-call-at":"handler.go:141"}
{"level":"info","ts":"2022-06-13T12:00:00.303Z","msg":"scavenger.deleteHandler processed.","service":"worker","wf-namespace-id":"4b3cdd99-2db4-43cb-ae8a-4dcda771b92e","wf-task-queue-name":"/_sys/CONNECTION_UPDATER/3","wf-task-queue-type":"Workflow","number-processed":1,"number-deleted":0,"logging-call-at":"handler.go:141"}
{"level":"info","ts":"2022-06-13T12:00:00.303Z","msg":"scavenger.deleteHandler processed.","service":"worker","wf-namespace-id":"4b3cdd99-2db4-43cb-ae8a-4dcda771b92e","wf-task-queue-name":"/_sys/CONNECTION_UPDATER/3","wf-task-queue-type":"Activity","number-processed":1,"number-deleted":0,"logging-call-at":"handler.go:141"}
{"level":"info","ts":"2022-06-13T12:00:00.303Z","msg":"scavenger.deleteHandler processed.","service":"worker","wf-namespace-id":"4b3cdd99-2db4-43cb-ae8a-4dcda771b92e","wf-task-queue-name":"/_sys/CONNECTION_UPDATER/2","wf-task-queue-type":"Activity","number-processed":1,"number-deleted":0,"logging-call-at":"handler.go:141"}
{"level":"info","ts":"2022-06-13T12:00:00.303Z","msg":"scavenger.deleteHandler processed.","service":"worker","wf-namespace-id":"4b3cdd99-2db4-43cb-ae8a-4dcda771b92e","wf-task-queue-name":"/_sys/CONNECTION_UPDATER/1","wf-task-queue-type":"Activity","number-processed":1,"number-deleted":0,"logging-call-at":"handler.go:141"}
{"level":"info","ts":"2022-06-13T12:00:00.303Z","msg":"scavenger.deleteHandler processed.","service":"worker","wf-namespace-id":"4b3cdd99-2db4-43cb-ae8a-4dcda771b92e","wf-task-queue-name":"/_sys/SYNC/1","wf-task-queue-type":"Activity","number-processed":1,"number-deleted":0,"logging-call-at":"handler.go:141"}
{"level":"info","ts":"2022-06-13T12:00:00.305Z","msg":"scavenger.deleteHandler processed.","service":"worker","wf-namespace-id":"4b3cdd99-2db4-43cb-ae8a-4dcda771b92e","wf-task-queue-name":"/_sys/SYNC/3","wf-task-queue-type":"Activity","number-processed":1,"number-deleted":0,"logging-call-at":"handler.go:141"}
{"level":"info","ts":"2022-06-13T12:00:00.305Z","msg":"scavenger.deleteHandler processed.","service":"worker","wf-namespace-id":"4b3cdd99-2db4-43cb-ae8a-4dcda771b92e","wf-task-queue-name":"/_sys/SYNC/1","wf-task-queue-type":"Workflow","number-processed":1,"number-deleted":0,"logging-call-at":"handler.go:141"}
{"level":"info","ts":"2022-06-13T12:00:00.305Z","msg":"scavenger.deleteHandler processed.","service":"worker","wf-namespace-id":"4b3cdd99-2db4-43cb-ae8a-4dcda771b92e","wf-task-queue-name":"/_sys/SYNC/2","wf-task-queue-type":"Workflow","number-processed":1,"number-deleted":0,"logging-call-at":"handler.go:141"}
{"level":"info","ts":"2022-06-13T12:00:00.306Z","msg":"scavenger.deleteHandler processed.","service":"worker","wf-namespace-id":"4b3cdd99-2db4-43cb-ae8a-4dcda771b92e","wf-task-queue-name":"/_sys/SYNC/2","wf-task-queue-type":"Activity","number-processed":1,"number-deleted":0,"logging-call-at":"handler.go:141"}
{"level":"info","ts":"2022-06-13T12:00:00.306Z","msg":"scavenger.deleteHandler processed.","service":"worker","wf-namespace-id":"4b3cdd99-2db4-43cb-ae8a-4dcda771b92e","wf-task-queue-name":"CONNECTION_UPDATER","wf-task-queue-type":"Activity","number-processed":1,"number-deleted":0,"logging-call-at":"handler.go:141"}
{"level":"info","ts":"2022-06-13T12:00:00.307Z","msg":"scavenger.deleteHandler processed.","service":"worker","wf-namespace-id":"4b3cdd99-2db4-43cb-ae8a-4dcda771b92e","wf-task-queue-name":"CONNECTION_UPDATER","wf-task-queue-type":"Workflow","number-processed":1,"number-deleted":0,"logging-call-at":"handler.go:141"}
{"level":"info","ts":"2022-06-13T12:00:00.308Z","msg":"scavenger.deleteHandler processed.","service":"worker","wf-namespace-id":"4b3cdd99-2db4-43cb-ae8a-4dcda771b92e","wf-task-queue-name":"SYNC","wf-task-queue-type":"Activity","number-processed":1,"number-deleted":0,"logging-call-at":"handler.go:141"}
{"level":"info","ts":"2022-06-13T12:01:00.304Z","msg":"Taskqueue scavenger stopping","service":"worker","logging-call-at":"scavenger.go:131"}
{"level":"info","ts":"2022-06-13T12:01:00.304Z","msg":"Taskqueue scavenger stopped","service":"worker","logging-call-at":"scavenger.go:135"}

The taskqueue keeps restarting. In my cluster, there are in total 3K connections, among them, about 1K jobs stuck in the running for days. And no more jobs are created for the remaining 2K connections.

And I don’t know if it’s only my issue, if I recreated all airbyte-worker pods and temporal pod, those jobs stuck at running are not picked up.

Just to share some findings, I appreciate your feedback and help. We could continue the conversation. So far, we are good for our load tests in our cluster. The airbyte team never stops improving the performance, I look forward to the update in the future version.

@yli , any update here? did you figure out what was happening?

Has anyone tried autoscale for workers based on number of jobs metrics in k8s ?