The airbyte-worker

airbyte-worker

This service create a connection with Temporal and receives activities (tasks) from Temporal to execute, like: check operation, spin up docker containers and do the sync, start dbt normalization, etc.

During start up the application creates a WorkerFactory using the maximum number of parallel workers defined in applicaton.yaml file. As default all type of workers are enabled. There is an option to have particular worker exclusively to a specific task.

  • Check, Connection, Discover, Spec, Sync, Notification
public enum TemporalJobType {
  GET_SPEC,
  CHECK_CONNECTION,
  DISCOVER_SCHEMA,
  SYNC,
  RESET_CONNECTION,
  CONNECTION_UPDATER,
  REPLICATE,
  NOTIFY
}

:memo: In Temporal, a WorkerFactory is responsible for creating and managing workers that execute workflow and activity tasks. It is a component that helps register workflow and activity implementations, configure worker options, and start the workers to process tasks from the Temporal service.

When running in Docker, the ReplicationWorker is hosted within the airbyte-worker called InMemoryOrchestrator when use Kubernetes there is a service called airbyte-container-orchestrator which handle this operation.

Airbyte Worker is probably the most code heavy component in the code base. In some sense the image below from the How we scale workflow orchestration with Temporal article help us understand the connection between the worker and Temporal.

Inside the airbyte-worker there are all Activity Implementations (which are Temporal tasks) and in the airbyte-common-worker we can find all Runners and “Workers” doing the actions.

If you’re interested in the replication (sync between source and destination) you can find the logic in airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java

Example (Manual Sync)

connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::submitManualSync);
@Override
  public void submitManualSync() {
    if (workflowState.isRunning()) {
      log.info("Can't schedule a manual workflow if a sync is running for connection {}", connectionId);
      return;
    }

    workflowState.setSkipScheduling(true);
  }

After Temporal has an activity CONNECTION_UPDATE checks for any type of update.

In this case we update the sync to skip the schedule and it will trigger the workflow again.

Additional Resources