@davinchia @marcosmarxm
The temporal scheduler makes a difference, the good part is that more jobs are running concurrently, and there are barely pending jobs. The downside side is that the running jobs stay in running for a long time, and the auto-scaling starts to turn off nodes.
I have only one workspace, so, the following graph refers to all connections in the whole airbyte namespace. A script is run to create connections minute by minute, each connection is a source-e2e-test <> destination-e2e-test
. Among the connections, 95% of them are small, which generates 5K logs, and 5% are large, which generates 1M logs.
The following metric comes from metrics_reporter.overall_job_runtime_in_last_hour_by_terminal_state_secs
filtered by status of succeeded
. According to the source code, only jobs completed within 1 hour are included. There are no failed jobs.
The data looks good when the size is smaller than 4K. Jobs are getting stuck at the running state since 20:30 when there are more than 4K connections.
Compared to the old scheduler (graphs attached in previous comments), almost no jobs are placed in the pending queue.
And if you look at the eks cluster, when the number of running jobs stay at 5K, the node size gets smaller
and the CPU utilization is smaller
You may wonder what the job is busy with, I downloaded a job’s log, and I will include some of them here. The log out is the same: Setting kube job mdc
for hours. And the sync technically is done, but job is stuck in running state and doesn’t close.
2022-05-25 02:57:09 e[32mINFOe[m i.a.w.p.KubeProcessFactory(create):100 - Attempting to start pod = destination-e2e-test-check-3337-0-dphwz for airbyte/destination-e2e-test:0.2.2
2022-05-25 02:57:09 e[32mINFOe[m i.a.w.p.KubeProcessFactory(create):103 - destination-e2e-test-check-3337-0-dphwz stdoutLocalPort = 9237
2022-05-25 02:57:09 e[32mINFOe[m i.a.w.p.KubeProcessFactory(create):106 - destination-e2e-test-check-3337-0-dphwz stderrLocalPort = 9238
2022-05-25 02:57:09 e[32mINFOe[m i.a.w.p.KubePodProcess(lambda$setupStdOutAndStdErrListeners$9):583 - Creating stdout socket server...
2022-05-25 02:57:09 e[32mINFOe[m i.a.w.p.KubePodProcess(lambda$setupStdOutAndStdErrListeners$10):601 - Creating stderr socket server...
2022-05-25 02:57:09 e[32mINFOe[m i.a.w.p.KubePodProcess(<init>):515 - Creating pod destination-e2e-test-check-3337-0-dphwz
...
2022-05-25 02:57:54 e[32mINFOe[m i.a.w.p.KubePodProcess(<init>):573 - Using null stdin output stream...
2022-05-25 02:57:54 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):166 - Waiting for source and destination threads to complete.
2022-05-25 02:57:54 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):295 - Replication thread started.
2022-05-25 02:57:54 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$7):385 - Destination output thread started.
2022-05-25 02:57:55 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):334 - Records read: 1000 (20 KB)
2022-05-25 02:57:57 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):334 - Records read: 2000 (41 KB)
...
2022-05-25 02:57:55 e[43mdestinatione[0m > 2022-05-25 02:57:55 e[32mINFOe[m i.a.i.d.e.l.FirstNLogger(log):27 - [2022-05-25T02:57:54.306Z] data_stream #0997: {"column1":"gnmcc"}
2022-05-25 02:57:55 e[43mdestinatione[0m > 2022-05-25 02:57:55 e[32mINFOe[m i.a.i.d.e.l.FirstNLogger(log):27 - [2022-05-25T02:57:54.306Z] data_stream #0998: {"column1":"yc"}
2022-05-25 02:57:55 e[43mdestinatione[0m > 2022-05-25 02:57:55 e[32mINFOe[m i.a.i.d.e.l.FirstNLogger(log):27 - [2022-05-25T02:57:54.306Z] data_stream #0999: {"column1":"yllq"}
2022-05-25 02:57:55 e[43mdestinatione[0m > 2022-05-25 02:57:55 e[32mINFOe[m i.a.i.d.e.l.FirstNLogger(log):27 - [2022-05-25T02:57:54.306Z] data_stream #1000: {"column1":"m"}
2022-05-25 02:58:49 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):334 - Records read: 46000 (944 KB)
2022-05-25 02:58:50 e[36mDEBUGe[m i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
2022-05-25 02:58:50 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):334 - Records read: 47000 (964 KB)
2022-05-25 02:58:51 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):334 - Records read: 48000 (985 KB)
...
2022-05-25 03:54:30 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):334 - Records read: 999000 (20 MB)
2022-05-25 03:54:33 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):334 - Records read: 1000000 (20 MB)
2022-05-25 03:54:33 e[36mDEBUGe[m i.f.k.c.i.c.Reflector(stopWatcher):64 - Stopping watcher for resource class io.fabric8.kubernetes.api.model.Pod v2464230 in namespace airbyte
2022-05-25 03:54:33 e[36mDEBUGe[m i.f.k.c.d.i.AbstractWatchManager(close):230 - Force closing the watch io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@7f4f64aa
2022-05-25 03:54:33 e[36mDEBUGe[m i.f.k.c.i.c.Reflector$ReflectorWatcher(onClose):181 - Watch gracefully closed
022-05-25 03:54:33 e[36mDEBUGe[m i.f.k.c.d.i.WatchConnectionManager(closeWebSocket):60 - Closing websocket io.fabric8.kubernetes.client.okhttp.OkHttpWebSocketImpl@434f7eea
2022-05-25 03:54:33 e[36mDEBUGe[m i.f.k.c.d.i.WatchConnectionManager(closeWebSocket):63 - Websocket already closed io.fabric8.kubernetes.client.okhttp.OkHttpWebSocketImpl@434f7eea
2022-05-25 03:54:33 e[36mDEBUGe[m i.f.k.c.d.i.WatchConnectionManager(closeWebSocket):60 - Closing websocket io.fabric8.kubernetes.client.okhttp.OkHttpWebSocketImpl@6d40e083
2022-05-25 03:54:33 e[32mINFOe[m i.a.w.p.KubePodProcess(close):713 - (pod: airbyte / source-e2e-test-read-3337-0-ucewu) - Closed all resources for pod
2022-05-25 03:54:33 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):345 - Total records read: 1000000 (20 MB)
2022-05-25 03:54:33 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):171 - One of source or destination thread complete. Waiting on the other.
2022-05-25 03:54:40 e[36mDEBUGe[m i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
2022-05-25 03:54:34 e[43mdestinatione[0m > 2022-05-25 03:54:34 e[32mINFOe[m i.a.i.b.IntegrationRunner(runInternal):153 - Completed integration: io.airbyte.integrations.destination.e2e_test.TestingDestinations
2022-05-25 03:54:34 e[43mdestinatione[0m > 2022-05-25 03:54:34 e[32mINFOe[m i.a.i.d.e.TestingDestinations(main):73 - completed destination: class io.airbyte.integrations.destination.e2e_test.TestingDestinations
2022-05-25 03:54:49 e[36mDEBUGe[m i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
2022-05-25 03:55:00 e[36mDEBUGe[m i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
...
2022-05-25 05:41:39 e[36mDEBUGe[m i.a.c.h.LogClientSingleton(setJobMdc):137 - Setting kube job mdc
Here’s a few environment variables for the airbyte:
worker_replica_count = 12
max_sync_workers = 400
max_spec_workers = 100
max_check_workers = 100
max_discover_workers = 100
And we set resources.limit and resources.request on scheduler, worker and jobs.
variable "cpu_request" {
default = "500m"
}
variable "cpu_limit" {
default = "1000m"
}
variable "memory_request" {
default = "2Gi"
}
variable "memory_limit" {
default = "4Gi"
}
The temporal_worker_ports is [9001, 10000], in total 1000 ports.
For the EKS cluster, here’s the info.
node_group_name = "m5-2xl"
instance_types = ["m5.2xlarge"]
ami_type = "BOTTLEROCKET_x86_64"
platform = "bottlerocket"
min_size = "10"
desired_size = "10"
max_size = "30"
Can you help me? How can I adjust the setting so that jobs are completed gracefully? Since job_submitter_threads
is deprecated, is there an environment variable to restrain the maximum jobs being run? In our case, a small job is completed in 30 seconds, and a large job is completed in 90 seconds, it seems that 800 is a reasonable number. However, the new scheduler pushes it to 5K when the size of the connection reaches 4K.