- Is this your first time deploying Airbyte?: No
- OS Version / Instance: Ubuntu
- Memory / Disk: 50 GB
- Deployment: Docker
- Airbyte Version: 0.40.15
- Source name/version: source-mysql/1.0.6
- Destination name/version: destination-bigquery/1.2.5
- Step: The issue is happening during sync
- Description:
Hello all,
we currently have the problem that CDC for the mysql connector does not work and terminates with the following error message:
2022-10-20 12:09:17 e[44msourcee[0m > Starting FileOffsetBackingStore with file /tmp/cdc-state-offset11166348234859605997/offset.dat
2022-10-20 12:09:17 e[44msourcee[0m > Starting MySqlConnectorTask with configuration:
2022-10-20 12:09:17 e[44msourcee[0m > connector.class = io.debezium.connector.mysql.MySqlConnector
2022-10-20 12:09:17 e[44msourcee[0m > snapshot.locking.mode = none
2022-10-20 12:09:17 e[44msourcee[0m > max.queue.size = 8192
2022-10-20 12:09:17 e[44msourcee[0m > database.history.consumer.security.protocol = SSL
2022-10-20 12:09:17 e[44msourcee[0m > include.schema.changes = false
2022-10-20 12:09:17 e[44msourcee[0m > database.sslmode = PREFERRED
2022-10-20 12:09:17 e[44msourcee[0m > binary.handling.mode = base64
2022-10-20 12:09:17 e[44msourcee[0m > offset.storage.file.filename = /tmp/cdc-state-offset11166348234859605997/offset.dat
2022-10-20 12:09:17 e[44msourcee[0m > decimal.handling.mode = string
2022-10-20 12:09:17 e[44msourcee[0m > converters = boolean, datetime
2022-10-20 12:09:17 e[44msourcee[0m > datetime.type = io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter
2022-10-20 12:09:17 e[44msourcee[0m > value.converter = org.apache.kafka.connect.json.JsonConverter
2022-10-20 12:09:17 e[44msourcee[0m > key.converter = org.apache.kafka.connect.json.JsonConverter
2022-10-20 12:09:17 e[44msourcee[0m > database.history.file.filename = /tmp/cdc-db-history5183249625643026122/dbhistory.dat
2022-10-20 12:09:17 e[44msourcee[0m > database.user = airbyte
2022-10-20 12:09:17 e[44msourcee[0m > database.dbname = database
2022-10-20 12:09:17 e[44msourcee[0m > offset.storage = org.apache.kafka.connect.storage.FileOffsetBackingStore
2022-10-20 12:09:17 e[44msourcee[0m > database.history.producer.security.protocol = SSL
2022-10-20 12:09:17 e[44msourcee[0m > boolean.type = io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
2022-10-20 12:09:17 e[44msourcee[0m > database.server.name = database
2022-10-20 12:09:17 e[44msourcee[0m > offset.flush.timeout.ms = 5000
2022-10-20 12:09:17 e[44msourcee[0m > database.port = 3306
2022-10-20 12:09:17 e[44msourcee[0m > offset.flush.interval.ms = 1000
2022-10-20 12:09:17 e[44msourcee[0m > key.converter.schemas.enable = false
2022-10-20 12:09:17 e[44msourcee[0m > internal.key.converter = org.apache.kafka.connect.json.JsonConverter
2022-10-20 12:09:17 e[44msourcee[0m > database.serverTimezone = Europe/Berlin
2022-10-20 12:09:17 e[44msourcee[0m > database.hostname = database-server.com
2022-10-20 12:09:17 e[44msourcee[0m > database.password = ********
2022-10-20 12:09:17 e[44msourcee[0m > name = database
2022-10-20 12:09:17 e[44msourcee[0m > value.converter.schemas.enable = false
2022-10-20 12:09:17 e[44msourcee[0m > internal.value.converter = org.apache.kafka.connect.json.JsonConverter
2022-10-20 12:09:17 e[44msourcee[0m > max.batch.size = 2048
2022-10-20 12:09:17 e[44msourcee[0m > table.include.list = database.access_log
2022-10-20 12:09:17 e[44msourcee[0m > snapshot.mode = when_needed
2022-10-20 12:09:17 e[44msourcee[0m > database.history = io.debezium.relational.history.FileDatabaseHistory
2022-10-20 12:09:17 e[44msourcee[0m > database.include.list = database
2022-10-20 12:09:17 e[44msourcee[0m > Database configuration option 'serverTimezone' is set but is obsolete, please use 'connectionTimeZone' instead
2022-10-20 12:09:17 e[44msourcee[0m > Stopping down connector
2022-10-20 12:09:17 e[44msourcee[0m > Stopped FileOffsetBackingStore
2022-10-20 12:09:17 e[44msourcee[0m > Debezium engine shutdown.
2022-10-20 12:09:18 e[43mdestinatione[0m > Creating tmp table GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=source, tableId=_airbyte_tmp_gcv_access_log}}
2022-10-20 12:09:18 e[43mdestinatione[0m > Partitioned table created successfully: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=source, tableId=_airbyte_tmp_gcv_access_log}}
2022-10-20 12:09:18 e[43mdestinatione[0m > Creating staging path for stream access_log (dataset source): data/source_access_log/2022/10/20/12/c9b20f7f-4b15-4cea-825a-cf70ada6273e/
2022-10-20 12:09:18 e[43mdestinatione[0m > Storage Object airbyte-sync/data/source_access_log/2022/10/20/12/c9b20f7f-4b15-4cea-825a-cf70ada6273e/ does not exist in bucket; creating...
2022-10-20 12:09:18 e[43mdestinatione[0m > Storage Object airbyte-sync/data/source_access_log/2022/10/20/12/c9b20f7f-4b15-4cea-825a-cf70ada6273e/ has been created in bucket.
2022-10-20 12:09:18 e[43mdestinatione[0m > Preparing tmp tables in destination completed.
2022-10-20 12:14:17 e[44msourcee[0m > Closing cause next is returned as null
2022-10-20 12:14:17 e[44msourcee[0m > Stopping the embedded engine
2022-10-20 12:14:17 e[44msourcee[0m > Closing database connection pool.
2022-10-20 12:14:17 e[44msourcee[0m > HikariPool-1 - Shutdown initiated...
2022-10-20 12:14:17 e[44msourcee[0m > HikariPool-1 - Shutdown completed.
2022-10-20 12:14:17 e[44msourcee[0m > Closed database connection pool.
2022-10-20 12:14:17 e[44msourcee[0m > Something went wrong in the connector. See the logs for more details.
Stack Trace: java.lang.RuntimeException: java.lang.RuntimeException: io.debezium.DebeziumException: Unexpected error while connecting to MySQL and looking at BINLOG_FORMAT mode:
at io.airbyte.integrations.debezium.internals.DebeziumRecordIterator.requestClose(DebeziumRecordIterator.java:137)
at io.airbyte.integrations.debezium.internals.DebeziumRecordIterator.computeNext(DebeziumRecordIterator.java:84)
at io.airbyte.integrations.debezium.internals.DebeziumRecordIterator.computeNext(DebeziumRecordIterator.java:33)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:146)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:141)
at com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:46)
at io.airbyte.commons.util.DefaultAutoCloseableIterator.computeNext(DefaultAutoCloseableIterator.java:38)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:146)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:141)
at io.airbyte.commons.util.CompositeIterator.computeNext(CompositeIterator.java:63)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:146)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:141)
at io.airbyte.commons.util.CompositeIterator.computeNext(CompositeIterator.java:63)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:146)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:141)
at io.airbyte.commons.util.DefaultAutoCloseableIterator.computeNext(DefaultAutoCloseableIterator.java:38)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:146)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:141)
at io.airbyte.commons.util.DefaultAutoCloseableIterator.computeNext(DefaultAutoCloseableIterator.java:38)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:146)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:141)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
at io.airbyte.integrations.base.IntegrationRunner.lambda$produceMessages$0(IntegrationRunner.java:157)
at io.airbyte.integrations.base.IntegrationRunner.watchForOrphanThreads(IntegrationRunner.java:207)
at io.airbyte.integrations.base.IntegrationRunner.produceMessages(IntegrationRunner.java:156)
at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:137)
at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:97)
at io.airbyte.integrations.source.mysql.MySqlSource.main(MySqlSource.java:306)
Caused by: java.lang.RuntimeException: io.debezium.DebeziumException: Unexpected error while connecting to MySQL and looking at BINLOG_FORMAT mode:
at io.airbyte.integrations.debezium.internals.DebeziumRecordPublisher.close(DebeziumRecordPublisher.java:105)
at io.airbyte.commons.concurrency.VoidCallable.call(VoidCallable.java:15)
at io.airbyte.integrations.debezium.internals.DebeziumRecordIterator.requestClose(DebeziumRecordIterator.java:134)
... 27 more
Caused by: io.debezium.DebeziumException: Unexpected error while connecting to MySQL and looking at BINLOG_FORMAT mode:
at io.debezium.connector.mysql.MySqlConnection.isBinlogFormatRow(MySqlConnection.java:385)
at io.debezium.connector.mysql.MySqlConnectorTask.validateBinlogConfiguration(MySqlConnectorTask.java:240)
at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:85)
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:133)
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:760)
at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.sql.SQLException: Access denied for user 'airbyte'@'airbyte-server-ip.com' (using password: YES)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:828)
at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:448)
at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:241)
at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198)
at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:244)
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:888)
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:883)
at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:636)
at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510)
at io.debezium.connector.mysql.MySqlConnection.isBinlogFormatRow(MySqlConnection.java:380)
... 8 more
We have set up the binary log for the source database according to the documentation.
What we have already found out is that it is probably because SSL is enforced for the airbyte user:
CREATE USER `airbyte`@`server-ip.com` IDENTIFIED WITH 'caching_sha2_password' AS '...' REQUIRE SSL PASSWORD EXPIRE DEFAULT ACCOUNT UNLOCK PASSWORD HISTORY DEFAULT PASSWORD REUSE INTERVAL DEFAULT PASSWORD REQUIRE CURRENT DEFAULT;
This doesn’t seem to be a problem for the normal queries, but the queries that are sent to check the binlogs don’t seem to use the SSL settings that are set:
We have temporarily removed the require ssl - option for the user. With this, the connection went through successfully.
However, this is not an option for us as the SSL requirement is part of our company’s security policy.
Is there a solution for this? Or are we possibly already doing something wrong in the configuration? Or is CDC with the SSL requirement simply not possible for the user?
I hope you can help me with this.
Thank you.