airbyte-server
How the AuthRoleConstants
works? These are the roles we have today. In OSS there is only one user which is the owner. Today roles are applicable in Airbyte Cloud.
OWNER(500, AuthRoleConstants.OWNER),
ADMIN(400, AuthRoleConstants.ADMIN),
EDITOR(300, AuthRoleConstants.EDITOR),
READER(200, AuthRoleConstants.READER),
AUTHENTICATED_USER(100, AuthRoleConstants.AUTHENTICATED_USER), // ONLY USE WITH INSTANCE RESOURCE!
NONE(0, AuthRoleConstants.NONE);
It responsible to receive the API requests and execute the actions.
The airbyte-server
keeps only the API part and handlers are in airbyte-common-server
One Example of API workflow
When you trigger the Sync now
button
It will create an API request to the /sync
endpoint and the payload
{"connectionId":"8b8748bf-dca7-4eb5-8177-1da77df934a7"}
The Airbyte Server receives the WebApp UI API request, transforms the JSON object into a ConnectionIdRequestBody
object and call the specific function to execute the operation.
Some relevant links:
[1] Airbyte Server syncConnection
function
[2] The /v1/connections/sync
in OpenAPI spec
[3] The ConnectionIdRequestBody
definition in the OpenAPI spec
@Override
@Post(uri = "/sync")
@Secured({EDITOR})
@SecuredWorkspace
@ExecuteOn(AirbyteTaskExecutors.SCHEDULER)
public JobInfoRead syncConnection(@Body final ConnectionIdRequestBody connectionIdRequestBody) {
return ApiHelper.execute(() -> schedulerHandler.syncConnection(connectionIdRequestBody));
}
/v1/connections/sync:
post:
tags:
- connection
summary: Trigger a manual sync of the connection
operationId: syncConnection
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectionIdRequestBody"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/JobInfoRead"
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
ConnectionIdRequestBody:
type: object
required:
- connectionId
properties:
connectionId:
$ref: "#/components/schemas/ConnectionId"
For this particular example the server calls the function syncConnection
public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody)
throws IOException, JsonValidationException, ConfigNotFoundException {
return submitManualSyncToWorker(connectionIdRequestBody.getConnectionId());
}
private JobInfoRead submitManualSyncToWorker(final UUID connectionId)
throws IOException, IllegalStateException, JsonValidationException, ConfigNotFoundException {
// get standard sync to validate connection id before submitting sync to temporal
final var sync = configRepository.getStandardSync(connectionId);
if (!sync.getStatus().equals(StandardSync.Status.ACTIVE)) {
throw new IllegalStateException("Can only sync an active connection");
}
final ManualOperationResult manualSyncResult = eventRunner.startNewManualSync(connectionId);
return readJobFromResult(manualSyncResult);
}
Inside the TemporalEventRunner
@Override
public ManualOperationResult startNewManualSync(final UUID connectionId) {
return temporalClient.startNewManualSync(connectionId);
}
TemporalClient
public ManualOperationResult startNewManualSync(final UUID connectionId) {
log.info("Manual sync request");
if (connectionManagerUtils.isWorkflowStateRunning(client, connectionId)) {
// TODO Bmoric: Error is running
return new ManualOperationResult(
Optional.of("A sync is already running for: " + connectionId),
Optional.empty(), Optional.of(ErrorCode.WORKFLOW_RUNNING));
}
try {
connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::submitManualSync);
} catch (final DeletedWorkflowException e) {
log.error("Can't sync a deleted connection.", e);
return new ManualOperationResult(
Optional.of(e.getMessage()),
Optional.empty(), Optional.of(ErrorCode.WORKFLOW_DELETED));
}
do {
try {
Thread.sleep(DELAY_BETWEEN_QUERY_MS);
} catch (final InterruptedException e) {
return new ManualOperationResult(
Optional.of("Didn't managed to start a sync for: " + connectionId),
Optional.empty(), Optional.of(ErrorCode.UNKNOWN));
}
} while (!connectionManagerUtils.isWorkflowStateRunning(client, connectionId));
log.info("end of manual schedule");
final long jobId = connectionManagerUtils.getCurrentJobId(client, connectionId);
return new ManualOperationResult(
Optional.empty(),
Optional.of(jobId), Optional.empty());
}
Temporal returns only the jobId
information and the function readJobFromResult
inside the
submitManualSyncToWorker
scheduler handler will format the actual API response:
private JobInfoRead readJobFromResult(final ManualOperationResult manualOperationResult) throws IOException, IllegalStateException {
if (manualOperationResult.getFailingReason().isPresent()) {
if (VALUE_CONFLICT_EXCEPTION_ERROR_CODE_SET.contains(manualOperationResult.getErrorCode().get())) {
throw new ValueConflictKnownException(manualOperationResult.getFailingReason().get());
} else {
throw new IllegalStateException(manualOperationResult.getFailingReason().get());
}
}
final Job job = jobPersistence.getJob(manualOperationResult.getJobId().get());
return jobConverter.getJobInfoRead(job);
}
And finally the response broadcast to WebApp API request is:
{
"job": {
"id": 1245,
"configType": "sync",
"configId": "8b8748bf-dca7-4eb5-8177-1da77df934a7",
"enabledStreams": [
{
"name": "us_census_stream"
}
],
"createdAt": 1685543955,
"updatedAt": 1685543955,
"status": "running"
},
"attempts": [
{
"attempt": {
"id": 0,
"status": "running",
"createdAt": 1685543955,
"updatedAt": 1685543955
},
"logs": {
"logLines": []
}
}
]
}