The airbyte-server service

airbyte-server

How the AuthRoleConstants works? These are the roles we have today. In OSS there is only one user which is the owner. Today roles are applicable in Airbyte Cloud.

OWNER(500, AuthRoleConstants.OWNER),
ADMIN(400, AuthRoleConstants.ADMIN),
EDITOR(300, AuthRoleConstants.EDITOR),
READER(200, AuthRoleConstants.READER),
AUTHENTICATED_USER(100, AuthRoleConstants.AUTHENTICATED_USER), // ONLY USE WITH INSTANCE RESOURCE!
NONE(0, AuthRoleConstants.NONE);

It responsible to receive the API requests and execute the actions.

The airbyte-server keeps only the API part and handlers are in airbyte-common-server

One Example of API workflow

When you trigger the Sync now button

It will create an API request to the /sync endpoint and the payload

{"connectionId":"8b8748bf-dca7-4eb5-8177-1da77df934a7"}

The Airbyte Server receives the WebApp UI API request, transforms the JSON object into a ConnectionIdRequestBody object and call the specific function to execute the operation.

Some relevant links:

[1] Airbyte Server syncConnection function

[2] The /v1/connections/sync in OpenAPI spec

[3] The ConnectionIdRequestBody definition in the OpenAPI spec

@Override
  @Post(uri = "/sync")
  @Secured({EDITOR})
  @SecuredWorkspace
  @ExecuteOn(AirbyteTaskExecutors.SCHEDULER)
  public JobInfoRead syncConnection(@Body final ConnectionIdRequestBody connectionIdRequestBody) {
    return ApiHelper.execute(() -> schedulerHandler.syncConnection(connectionIdRequestBody));
  }
/v1/connections/sync:
    post:
      tags:
        - connection
      summary: Trigger a manual sync of the connection
      operationId: syncConnection
      requestBody:
        content:
          application/json:
            schema:
              $ref: "#/components/schemas/ConnectionIdRequestBody"
        required: true
      responses:
        "200":
          description: Successful operation
          content:
            application/json:
              schema:
                $ref: "#/components/schemas/JobInfoRead"
        "404":
          $ref: "#/components/responses/NotFoundResponse"
        "422":
          $ref: "#/components/responses/InvalidInputResponse"
ConnectionIdRequestBody:
      type: object
      required:
        - connectionId
      properties:
        connectionId:
          $ref: "#/components/schemas/ConnectionId"

For this particular example the server calls the function syncConnection

public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody)
      throws IOException, JsonValidationException, ConfigNotFoundException {
    return submitManualSyncToWorker(connectionIdRequestBody.getConnectionId());
  }
private JobInfoRead submitManualSyncToWorker(final UUID connectionId)
      throws IOException, IllegalStateException, JsonValidationException, ConfigNotFoundException {
    // get standard sync to validate connection id before submitting sync to temporal
    final var sync = configRepository.getStandardSync(connectionId);
    if (!sync.getStatus().equals(StandardSync.Status.ACTIVE)) {
      throw new IllegalStateException("Can only sync an active connection");
    }
    final ManualOperationResult manualSyncResult = eventRunner.startNewManualSync(connectionId);

    return readJobFromResult(manualSyncResult);
  }

Inside the TemporalEventRunner

@Override
  public ManualOperationResult startNewManualSync(final UUID connectionId) {
    return temporalClient.startNewManualSync(connectionId);
  }

TemporalClient

public ManualOperationResult startNewManualSync(final UUID connectionId) {
    log.info("Manual sync request");

    if (connectionManagerUtils.isWorkflowStateRunning(client, connectionId)) {
      // TODO Bmoric: Error is running
      return new ManualOperationResult(
          Optional.of("A sync is already running for: " + connectionId),
          Optional.empty(), Optional.of(ErrorCode.WORKFLOW_RUNNING));
    }

    try {
      connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::submitManualSync);
    } catch (final DeletedWorkflowException e) {
      log.error("Can't sync a deleted connection.", e);
      return new ManualOperationResult(
          Optional.of(e.getMessage()),
          Optional.empty(), Optional.of(ErrorCode.WORKFLOW_DELETED));
    }

    do {
      try {
        Thread.sleep(DELAY_BETWEEN_QUERY_MS);
      } catch (final InterruptedException e) {
        return new ManualOperationResult(
            Optional.of("Didn't managed to start a sync for: " + connectionId),
            Optional.empty(), Optional.of(ErrorCode.UNKNOWN));
      }
    } while (!connectionManagerUtils.isWorkflowStateRunning(client, connectionId));

    log.info("end of manual schedule");

    final long jobId = connectionManagerUtils.getCurrentJobId(client, connectionId);

    return new ManualOperationResult(
        Optional.empty(),
        Optional.of(jobId), Optional.empty());
  }

Temporal returns only the jobId information and the function readJobFromResult inside the

submitManualSyncToWorker scheduler handler will format the actual API response:

private JobInfoRead readJobFromResult(final ManualOperationResult manualOperationResult) throws IOException, IllegalStateException {
    if (manualOperationResult.getFailingReason().isPresent()) {
      if (VALUE_CONFLICT_EXCEPTION_ERROR_CODE_SET.contains(manualOperationResult.getErrorCode().get())) {
        throw new ValueConflictKnownException(manualOperationResult.getFailingReason().get());
      } else {
        throw new IllegalStateException(manualOperationResult.getFailingReason().get());
      }
    }

    final Job job = jobPersistence.getJob(manualOperationResult.getJobId().get());

    return jobConverter.getJobInfoRead(job);
  }

And finally the response broadcast to WebApp API request is:

{
    "job": {
        "id": 1245,
        "configType": "sync",
        "configId": "8b8748bf-dca7-4eb5-8177-1da77df934a7",
        "enabledStreams": [
            {
                "name": "us_census_stream"
            }
        ],
        "createdAt": 1685543955,
        "updatedAt": 1685543955,
        "status": "running"
    },
    "attempts": [
        {
            "attempt": {
                "id": 0,
                "status": "running",
                "createdAt": 1685543955,
                "updatedAt": 1685543955
            },
            "logs": {
                "logLines": []
            }
        }
    ]
}