2022-05-02 20:09:24 INFO i.a.w.w.WorkerRun(call):49 - Executing worker wrapper. Airbyte version: 0.35.59-alpha 2022-05-02 20:09:24 INFO i.a.w.t.TemporalAttemptExecution(get):105 - Docker volume job log path: /tmp/workspace/11/0/logs.log 2022-05-02 20:09:24 INFO i.a.w.t.TemporalAttemptExecution(get):110 - Executing worker wrapper. Airbyte version: 0.35.59-alpha 2022-05-02 20:09:24 INFO i.a.w.DefaultReplicationWorker(run):104 - start sync worker. job id: 11 attempt id: 0 2022-05-02 20:09:24 INFO i.a.w.DefaultReplicationWorker(run):116 - configured sync modes: {null.test-topic=full_refresh - append} 2022-05-02 20:09:24 INFO i.a.w.p.a.DefaultAirbyteDestination(start):69 - Running destination... 2022-05-02 20:09:24 INFO i.a.c.i.LineGobbler(voidCall):82 - Checking if ericjavila946/airbyte-databricks:dev exists... 2022-05-02 20:09:25 INFO i.a.c.i.LineGobbler(voidCall):82 - ericjavila946/airbyte-databricks:dev was found locally. 2022-05-02 20:09:25 INFO i.a.w.p.DockerProcessFactory(create):106 - Creating docker job ID: 11 2022-05-02 20:09:25 INFO i.a.w.p.DockerProcessFactory(create):158 - Preparing command: docker run --rm --init -i -w /data/11/0 --log-driver none --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e WORKER_JOB_ATTEMPT=0 -e WORKER_CONNECTOR_IMAGE=ericjavila946/airbyte-databricks:dev -e AIRBYTE_ROLE= -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_VERSION=0.35.59-alpha -e WORKER_JOB_ID=11 ericjavila946/airbyte-databricks:dev write --config destination_config.json --catalog destination_catalog.json 2022-05-02 20:09:25 INFO i.a.c.i.LineGobbler(voidCall):82 - Checking if airbyte/source-kafka:0.1.4 exists... 2022-05-02 20:09:25 INFO i.a.c.i.LineGobbler(voidCall):82 - airbyte/source-kafka:0.1.4 was found locally. 2022-05-02 20:09:25 INFO i.a.w.p.DockerProcessFactory(create):106 - Creating docker job ID: 11 2022-05-02 20:09:25 INFO i.a.w.p.DockerProcessFactory(create):158 - Preparing command: docker run --rm --init -i -w /data/11/0 --log-driver none --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e WORKER_JOB_ATTEMPT=0 -e WORKER_CONNECTOR_IMAGE=airbyte/source-kafka:0.1.4 -e AIRBYTE_ROLE= -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_VERSION=0.35.59-alpha -e WORKER_JOB_ID=11 airbyte/source-kafka:0.1.4 read --config source_config.json --catalog source_catalog.json 2022-05-02 20:09:25 INFO i.a.w.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$6):339 - Destination output thread started. 2022-05-02 20:09:25 INFO i.a.w.DefaultReplicationWorker(run):158 - Waiting for source and destination threads to complete. 2022-05-02 20:09:25 INFO i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):279 - Replication thread started. 2022-05-02 20:09:26 destination > SLF4J: Class path contains multiple SLF4J bindings. 2022-05-02 20:09:26 destination > SLF4J: Found binding in [jar:file:/airbyte/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] 2022-05-02 20:09:26 destination > SLF4J: Found binding in [jar:file:/airbyte/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] 2022-05-02 20:09:26 destination > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 2022-05-02 20:09:26 destination > SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] 2022-05-02 20:09:27 source > 2022-05-02 20:09:27 INFO i.a.i.s.k.KafkaSource(main):134 - Starting source: class io.airbyte.integrations.source.kafka.KafkaSource 2022-05-02 20:09:28 source > 2022-05-02 20:09:28 INFO i.a.i.b.IntegrationCliParser(parseOptions):118 - integration args: {read=null, catalog=source_catalog.json, config=source_config.json} 2022-05-02 20:09:28 source > 2022-05-02 20:09:28 INFO i.a.i.b.IntegrationRunner(runInternal):105 - Running integration: io.airbyte.integrations.source.kafka.KafkaSource 2022-05-02 20:09:28 source > 2022-05-02 20:09:28 INFO i.a.i.b.IntegrationRunner(runInternal):106 - Command: READ 2022-05-02 20:09:28 source > 2022-05-02 20:09:28 INFO i.a.i.b.IntegrationRunner(runInternal):107 - Integration config: IntegrationConfig{command=READ, configPath='source_config.json', catalogPath='source_catalog.json', statePath='null'} 2022-05-02 20:09:28 destination > 2022-05-02 20:09:28 INFO i.a.i.b.IntegrationCliParser(parseOptions):118 - integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json} 2022-05-02 20:09:28 destination > 2022-05-02 20:09:28 INFO i.a.i.b.IntegrationRunner(run):88 - Sentry transaction event: fa734021d7c2408299ef99675da264a0 2022-05-02 20:09:28 destination > 2022-05-02 20:09:28 INFO i.a.i.b.IntegrationRunner(runInternal):106 - Running integration: io.airbyte.integrations.destination.databricks.DatabricksDestination 2022-05-02 20:09:28 destination > 2022-05-02 20:09:28 INFO i.a.i.b.IntegrationRunner(runInternal):107 - Command: WRITE 2022-05-02 20:09:28 destination > 2022-05-02 20:09:28 INFO i.a.i.b.IntegrationRunner(runInternal):108 - Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'} 2022-05-02 20:09:28 source > 2022-05-02 20:09:28 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2022-05-02 20:09:28 source > 2022-05-02 20:09:28 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2022-05-02 20:09:28 source > 2022-05-02 20:09:28 INFO i.a.i.s.k.KafkaSourceConfig(propertiesByProtocol):81 - Kafka protocol config: {"security_protocol":"PLAINTEXT"} 2022-05-02 20:09:29 destination > 2022-05-02 20:09:29 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2022-05-02 20:09:29 destination > 2022-05-02 20:09:29 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2022-05-02 20:09:30 source > 2022-05-02 20:09:30 INFO i.a.i.s.k.KafkaSource(check):57 - Successfully connected to Kafka brokers for topic 'test-topic'. 2022-05-02 20:09:30 source > 2022-05-02 20:09:30 INFO i.a.i.s.k.KafkaSourceConfig(propertiesByProtocol):81 - Kafka protocol config: {"security_protocol":"PLAINTEXT"} 2022-05-02 20:09:30 source > 2022-05-02 20:09:30 INFO i.a.i.s.k.KafkaSourceConfig(getConsumer):105 - Kafka subscribe method: {"topic_pattern":"0","subscription_type":"subscribe"} 2022-05-02 20:09:30 destination > 2022-05-02 20:09:30 INFO i.a.i.d.s.w.ProductionWriterFactory(create):37 - Json schema for stream miles-test-topic: {"type":"object","properties":{"value":{"type":"string"}}} 2022-05-02 20:09:30 destination > 2022-05-02 20:09:30 WARN i.a.i.d.s.a.JsonToAvroSchemaConverter(getAvroSchema):128 - Schema name "miles-test-topic" contains illegal character(s) and is standardized to "miles_test_topic" 2022-05-02 20:09:30 destination > 2022-05-02 20:09:30 INFO i.a.i.d.s.w.ProductionWriterFactory(create):42 - Avro schema for stream miles-test-topic: {"type":"record","name":"miles_test_topic","doc":"_airbyte_original_name:miles-test-topic","fields":[{"name":"_airbyte_ab_id","type":{"type":"string","logicalType":"uuid"}},{"name":"_airbyte_emitted_at","type":{"type":"long","logicalType":"timestamp-millis"}},{"name":"value","type":["null","string"],"default":null},{"name":"_airbyte_additional_properties","type":["null",{"type":"map","values":"string"}],"default":null}]} 2022-05-02 20:09:30 destination > 2022-05-02 20:09:30 INFO i.a.i.d.s.p.S3ParquetWriter():57 - Full S3 path for stream 'miles-test-topic': s3://contentsecurity-datalake-raw-nonprod/data_sync/test/1fdebeb7-7805-48a2-b923-4c8bf8f77d3b/miles_test_topic/2022_05_02_1651522170337_0.parquet 2022-05-02 20:09:30 destination > 2022-05-02 20:09:30 WARN o.a.h.u.NativeCodeLoader():60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2022-05-02 20:09:31 destination > 2022-05-02 20:09:31 WARN o.a.h.m.i.MetricsConfig(loadFirst):134 - Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 2022-05-02 20:09:32 destination > 2022-05-02 20:09:32 INFO i.a.i.d.d.DatabricksStreamCopier():94 - [Stream miles-test-topic] Database schema: content_security 2022-05-02 20:09:32 destination > 2022-05-02 20:09:32 INFO i.a.i.d.d.DatabricksStreamCopier():95 - [Stream miles-test-topic] Parquet schema: {"type":"record","name":"miles_test_topic","doc":"_airbyte_original_name:miles-test-topic","fields":[{"name":"_airbyte_ab_id","type":{"type":"string","logicalType":"uuid"}},{"name":"_airbyte_emitted_at","type":{"type":"long","logicalType":"timestamp-millis"}},{"name":"value","type":["null","string"],"default":null},{"name":"_airbyte_additional_properties","type":["null",{"type":"map","values":"string"}],"default":null}]} 2022-05-02 20:09:32 destination > 2022-05-02 20:09:32 INFO i.a.i.d.d.DatabricksStreamCopier():96 - [Stream miles-test-topic] Tmp table _airbyte_tmp_glg_miles_test_topic location: s3://contentsecurity-datalake-raw-nonprod/data_sync/test/1fdebeb7-7805-48a2-b923-4c8bf8f77d3b/miles_test_topic 2022-05-02 20:09:32 destination > 2022-05-02 20:09:32 INFO i.a.i.d.d.DatabricksStreamCopier():97 - [Stream miles-test-topic] Data table miles_test_topic location: s3://contentsecurity-datalake-raw-nonprod/data_sync/test/content_security/miles-test-topic 2022-05-02 20:09:33 destination > 2022-05-02 20:09:33 INFO i.a.i.d.b.BufferedStreamConsumer(startTracked):141 - class io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer started. 2022-05-02 20:09:33 source > 2022-05-02 20:09:33 INFO i.a.i.b.IntegrationRunner(runInternal):153 - Completed integration: io.airbyte.integrations.source.kafka.KafkaSource 2022-05-02 20:09:33 source > 2022-05-02 20:09:33 INFO i.a.i.s.k.KafkaSource(main):136 - Completed source: class io.airbyte.integrations.source.kafka.KafkaSource 2022-05-02 20:09:33 INFO i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):305 - Total records read: 0 (0 bytes) 2022-05-02 20:09:33 INFO i.a.w.DefaultReplicationWorker(run):163 - One of source or destination thread complete. Waiting on the other. 2022-05-02 20:09:33 destination > 2022-05-02 20:09:33 INFO i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):65 - Airbyte message consumer: succeeded. 2022-05-02 20:09:33 destination > 2022-05-02 20:09:33 INFO i.a.i.d.b.BufferedStreamConsumer(close):217 - executing on success close procedure. 2022-05-02 20:09:33 destination > 2022-05-02 20:09:33 INFO i.a.i.d.b.BufferedStreamConsumer(flushQueueToDestination):181 - Flushing buffer: 0 bytes 2022-05-02 20:09:33 destination > 2022-05-02 20:09:33 INFO i.a.i.d.s.w.BaseS3Writer(close):107 - Uploading remaining data for stream 'miles-test-topic'. 2022-05-02 20:09:34 destination > 2022-05-02 20:09:34 INFO i.a.i.d.s.w.BaseS3Writer(close):109 - Upload completed for stream 'miles-test-topic'. 2022-05-02 20:09:34 destination > 2022-05-02 20:09:34 INFO i.a.i.d.d.DatabricksStreamCopier(createDestinationSchema):119 - [Stream miles-test-topic] Creating database schema if it does not exist: content_security 2022-05-02 20:09:36 destination > 2022-05-02 20:09:36 INFO i.a.i.d.d.DatabricksStreamCopier(createTemporaryTable):125 - [Stream miles-test-topic] Creating tmp table _airbyte_tmp_glg_miles_test_topic from staging file: s3://contentsecurity-datalake-raw-nonprod/data_sync/test/1fdebeb7-7805-48a2-b923-4c8bf8f77d3b/miles_test_topic 2022-05-02 20:09:36 destination > 2022-05-02 20:09:36 INFO i.a.i.d.d.DatabricksStreamCopier(createTemporaryTable):129 - CREATE TABLE content_security._airbyte_tmp_glg_miles_test_topic USING parquet LOCATION 's3://contentsecurity-datalake-raw-nonprod/data_sync/test/1fdebeb7-7805-48a2-b923-4c8bf8f77d3b/miles_test_topic'; 2022-05-02 20:09:37 destination > 2022-05-02 20:09:37 INFO i.a.i.d.d.DatabricksStreamCopier(createDestinationTable):141 - [Stream miles-test-topic] Creating destination table if it does not exist: miles_test_topic 2022-05-02 20:09:37 destination > 2022-05-02 20:09:37 INFO i.a.i.d.d.DatabricksStreamCopier(createDestinationTable):163 - CREATE TABLE IF NOT EXISTS content_security.miles_test_topic USING delta LOCATION 's3://contentsecurity-datalake-raw-nonprod/data_sync/test/content_security/miles-test-topic' COMMENT 'Created from stream miles-test-topic' TBLPROPERTIES ('airbyte.destinationSyncMode' = 'append', delta.autoOptimize.autoCompact = true, delta.autoOptimize.optimizeWrite = true) AS SELECT * FROM content_security._airbyte_tmp_glg_miles_test_topic LIMIT 0 2022-05-02 20:09:38 destination > 2022-05-02 20:09:38 INFO i.a.i.d.d.DatabricksStreamCopier(generateMergeStatement):179 - COPY INTO content_security.miles_test_topic FROM 's3://contentsecurity-datalake-raw-nonprod/data_sync/test/1fdebeb7-7805-48a2-b923-4c8bf8f77d3b/miles_test_topic' FILEFORMAT = PARQUET PATTERN = '2022_05_02_1651522170337_0.parquet' 2022-05-02 20:09:46 destination > 2022-05-02 20:09:46 INFO i.a.i.d.d.DatabricksStreamCopier(removeFileAndDropTmpTable):186 - [Stream miles-test-topic] Deleting tmp table: _airbyte_tmp_glg_miles_test_topic 2022-05-02 20:09:47 destination > 2022-05-02 20:09:47 INFO i.a.i.d.d.DatabricksStreamCopier(removeFileAndDropTmpTable):189 - [Stream miles-test-topic] Deleting staging file: data_sync/test/1fdebeb7-7805-48a2-b923-4c8bf8f77d3b/miles_test_topic/2022_05_02_1651522170337_0.parquet 2022-05-02 20:09:48 destination > 2022-05-02 20:09:48 INFO i.a.i.b.IntegrationRunner(runInternal):154 - Completed integration: io.airbyte.integrations.destination.databricks.DatabricksDestination 2022-05-02 20:09:48 INFO i.a.w.DefaultReplicationWorker(run):165 - Source and destination threads complete. 2022-05-02 20:09:48 INFO i.a.w.DefaultReplicationWorker(run):228 - sync summary: io.airbyte.config.ReplicationAttemptSummary@51d31746[status=completed,recordsSynced=0,bytesSynced=0,startTime=1651522164774,endTime=1651522188333,totalStats=io.airbyte.config.SyncStats@24c2d335[recordsEmitted=0,bytesEmitted=0,stateMessagesEmitted=0,recordsCommitted=0],streamStats=[]] 2022-05-02 20:09:48 INFO i.a.w.DefaultReplicationWorker(run):250 - Source did not output any state messages 2022-05-02 20:09:48 WARN i.a.w.DefaultReplicationWorker(run):261 - State capture: No state retained. 2022-05-02 20:09:48 INFO i.a.w.t.TemporalAttemptExecution(get):131 - Stopping cancellation check scheduling... 2022-05-02 20:09:48 INFO i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$1):147 - sync summary: io.airbyte.config.StandardSyncOutput@68fd026d[standardSyncSummary=io.airbyte.config.StandardSyncSummary@1500253c[status=completed,recordsSynced=0,bytesSynced=0,startTime=1651522164774,endTime=1651522188333,totalStats=io.airbyte.config.SyncStats@24c2d335[recordsEmitted=0,bytesEmitted=0,stateMessagesEmitted=0,recordsCommitted=0],streamStats=[]],state=,outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@107979b8[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@4441ff10[stream=io.airbyte.protocol.models.AirbyteStream@442f639f[name=miles-test-topic,jsonSchema={"type":"object","properties":{"value":{"type":"string"}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=,defaultCursorField=[],sourceDefinedPrimaryKey=[],namespace=,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=append,primaryKey=[],additionalProperties={}]],additionalProperties={}],failures=[]] 2022-05-02 20:09:48 INFO i.a.w.t.TemporalUtils(withBackgroundHeartbeat):235 - Stopping temporal heartbeating... 2022-05-02 20:09:48 INFO i.a.v.j.JsonSchemaValidator(test):56 - JSON schema validation failed. errors: $.topic_partitions: is missing but it is required, $.subscription_type: does not have a value in the enumeration [assign], $.subscription_type: must be a constant value assign