Mysql CDC is not working when option user require ssl is enabled

  • Is this your first time deploying Airbyte?: No
  • OS Version / Instance: Ubuntu
  • Memory / Disk: 50 GB
  • Deployment: Docker
  • Airbyte Version: 0.40.15
  • Source name/version: source-mysql/1.0.6
  • Destination name/version: destination-bigquery/1.2.5
  • Step: The issue is happening during sync
  • Description:

Hello all,

we currently have the problem that CDC for the mysql connector does not work and terminates with the following error message:

2022-10-20 12:09:17 e[44msourcee[0m > Starting FileOffsetBackingStore with file /tmp/cdc-state-offset11166348234859605997/offset.dat
2022-10-20 12:09:17 e[44msourcee[0m > Starting MySqlConnectorTask with configuration:
2022-10-20 12:09:17 e[44msourcee[0m >    connector.class = io.debezium.connector.mysql.MySqlConnector
2022-10-20 12:09:17 e[44msourcee[0m >    snapshot.locking.mode = none
2022-10-20 12:09:17 e[44msourcee[0m >    max.queue.size = 8192
2022-10-20 12:09:17 e[44msourcee[0m >    database.history.consumer.security.protocol = SSL
2022-10-20 12:09:17 e[44msourcee[0m >    include.schema.changes = false
2022-10-20 12:09:17 e[44msourcee[0m >    database.sslmode = PREFERRED
2022-10-20 12:09:17 e[44msourcee[0m >    binary.handling.mode = base64
2022-10-20 12:09:17 e[44msourcee[0m >    offset.storage.file.filename = /tmp/cdc-state-offset11166348234859605997/offset.dat
2022-10-20 12:09:17 e[44msourcee[0m >    decimal.handling.mode = string
2022-10-20 12:09:17 e[44msourcee[0m >    converters = boolean, datetime
2022-10-20 12:09:17 e[44msourcee[0m >    datetime.type = io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter
2022-10-20 12:09:17 e[44msourcee[0m >    value.converter = org.apache.kafka.connect.json.JsonConverter
2022-10-20 12:09:17 e[44msourcee[0m >    key.converter = org.apache.kafka.connect.json.JsonConverter
2022-10-20 12:09:17 e[44msourcee[0m >    database.history.file.filename = /tmp/cdc-db-history5183249625643026122/dbhistory.dat
2022-10-20 12:09:17 e[44msourcee[0m >    database.user = airbyte
2022-10-20 12:09:17 e[44msourcee[0m >    database.dbname = database
2022-10-20 12:09:17 e[44msourcee[0m >    offset.storage = org.apache.kafka.connect.storage.FileOffsetBackingStore
2022-10-20 12:09:17 e[44msourcee[0m >    database.history.producer.security.protocol = SSL
2022-10-20 12:09:17 e[44msourcee[0m >    boolean.type = io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
2022-10-20 12:09:17 e[44msourcee[0m >    database.server.name = database
2022-10-20 12:09:17 e[44msourcee[0m >    offset.flush.timeout.ms = 5000
2022-10-20 12:09:17 e[44msourcee[0m >    database.port = 3306
2022-10-20 12:09:17 e[44msourcee[0m >    offset.flush.interval.ms = 1000
2022-10-20 12:09:17 e[44msourcee[0m >    key.converter.schemas.enable = false
2022-10-20 12:09:17 e[44msourcee[0m >    internal.key.converter = org.apache.kafka.connect.json.JsonConverter
2022-10-20 12:09:17 e[44msourcee[0m >    database.serverTimezone = Europe/Berlin
2022-10-20 12:09:17 e[44msourcee[0m >    database.hostname = database-server.com
2022-10-20 12:09:17 e[44msourcee[0m >    database.password = ********
2022-10-20 12:09:17 e[44msourcee[0m >    name = database
2022-10-20 12:09:17 e[44msourcee[0m >    value.converter.schemas.enable = false
2022-10-20 12:09:17 e[44msourcee[0m >    internal.value.converter = org.apache.kafka.connect.json.JsonConverter
2022-10-20 12:09:17 e[44msourcee[0m >    max.batch.size = 2048
2022-10-20 12:09:17 e[44msourcee[0m >    table.include.list = database.access_log
2022-10-20 12:09:17 e[44msourcee[0m >    snapshot.mode = when_needed
2022-10-20 12:09:17 e[44msourcee[0m >    database.history = io.debezium.relational.history.FileDatabaseHistory
2022-10-20 12:09:17 e[44msourcee[0m >    database.include.list = database
2022-10-20 12:09:17 e[44msourcee[0m > Database configuration option 'serverTimezone' is set but is obsolete, please use 'connectionTimeZone' instead
2022-10-20 12:09:17 e[44msourcee[0m > Stopping down connector
2022-10-20 12:09:17 e[44msourcee[0m > Stopped FileOffsetBackingStore
2022-10-20 12:09:17 e[44msourcee[0m > Debezium engine shutdown.
2022-10-20 12:09:18 e[43mdestinatione[0m > Creating tmp table GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=source, tableId=_airbyte_tmp_gcv_access_log}}
2022-10-20 12:09:18 e[43mdestinatione[0m > Partitioned table created successfully: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=source, tableId=_airbyte_tmp_gcv_access_log}}
2022-10-20 12:09:18 e[43mdestinatione[0m > Creating staging path for stream access_log (dataset source): data/source_access_log/2022/10/20/12/c9b20f7f-4b15-4cea-825a-cf70ada6273e/
2022-10-20 12:09:18 e[43mdestinatione[0m > Storage Object airbyte-sync/data/source_access_log/2022/10/20/12/c9b20f7f-4b15-4cea-825a-cf70ada6273e/ does not exist in bucket; creating...
2022-10-20 12:09:18 e[43mdestinatione[0m > Storage Object airbyte-sync/data/source_access_log/2022/10/20/12/c9b20f7f-4b15-4cea-825a-cf70ada6273e/ has been created in bucket.
2022-10-20 12:09:18 e[43mdestinatione[0m > Preparing tmp tables in destination completed.
2022-10-20 12:14:17 e[44msourcee[0m > Closing cause next is returned as null
2022-10-20 12:14:17 e[44msourcee[0m > Stopping the embedded engine
2022-10-20 12:14:17 e[44msourcee[0m > Closing database connection pool.
2022-10-20 12:14:17 e[44msourcee[0m > HikariPool-1 - Shutdown initiated...
2022-10-20 12:14:17 e[44msourcee[0m > HikariPool-1 - Shutdown completed.
2022-10-20 12:14:17 e[44msourcee[0m > Closed database connection pool.
2022-10-20 12:14:17 e[44msourcee[0m > Something went wrong in the connector. See the logs for more details.
Stack Trace: java.lang.RuntimeException: java.lang.RuntimeException: io.debezium.DebeziumException: Unexpected error while connecting to MySQL and looking at BINLOG_FORMAT mode: 
	at io.airbyte.integrations.debezium.internals.DebeziumRecordIterator.requestClose(DebeziumRecordIterator.java:137)
	at io.airbyte.integrations.debezium.internals.DebeziumRecordIterator.computeNext(DebeziumRecordIterator.java:84)
	at io.airbyte.integrations.debezium.internals.DebeziumRecordIterator.computeNext(DebeziumRecordIterator.java:33)
	at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:146)
	at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:141)
	at com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:46)
	at io.airbyte.commons.util.DefaultAutoCloseableIterator.computeNext(DefaultAutoCloseableIterator.java:38)
	at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:146)
	at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:141)
	at io.airbyte.commons.util.CompositeIterator.computeNext(CompositeIterator.java:63)
	at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:146)
	at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:141)
	at io.airbyte.commons.util.CompositeIterator.computeNext(CompositeIterator.java:63)
	at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:146)
	at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:141)
	at io.airbyte.commons.util.DefaultAutoCloseableIterator.computeNext(DefaultAutoCloseableIterator.java:38)
	at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:146)
	at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:141)
	at io.airbyte.commons.util.DefaultAutoCloseableIterator.computeNext(DefaultAutoCloseableIterator.java:38)
	at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:146)
	at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:141)
	at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
	at io.airbyte.integrations.base.IntegrationRunner.lambda$produceMessages$0(IntegrationRunner.java:157)
	at io.airbyte.integrations.base.IntegrationRunner.watchForOrphanThreads(IntegrationRunner.java:207)
	at io.airbyte.integrations.base.IntegrationRunner.produceMessages(IntegrationRunner.java:156)
	at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:137)
	at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:97)
	at io.airbyte.integrations.source.mysql.MySqlSource.main(MySqlSource.java:306)
Caused by: java.lang.RuntimeException: io.debezium.DebeziumException: Unexpected error while connecting to MySQL and looking at BINLOG_FORMAT mode: 
	at io.airbyte.integrations.debezium.internals.DebeziumRecordPublisher.close(DebeziumRecordPublisher.java:105)
	at io.airbyte.commons.concurrency.VoidCallable.call(VoidCallable.java:15)
	at io.airbyte.integrations.debezium.internals.DebeziumRecordIterator.requestClose(DebeziumRecordIterator.java:134)
	... 27 more
Caused by: io.debezium.DebeziumException: Unexpected error while connecting to MySQL and looking at BINLOG_FORMAT mode: 
	at io.debezium.connector.mysql.MySqlConnection.isBinlogFormatRow(MySqlConnection.java:385)
	at io.debezium.connector.mysql.MySqlConnectorTask.validateBinlogConfiguration(MySqlConnectorTask.java:240)
	at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:85)
	at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:133)
	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:760)
	at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.sql.SQLException: Access denied for user 'airbyte'@'airbyte-server-ip.com' (using password: YES)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:828)
	at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:448)
	at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:241)
	at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198)
	at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:244)
	at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:888)
	at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:883)
	at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:636)
	at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510)
	at io.debezium.connector.mysql.MySqlConnection.isBinlogFormatRow(MySqlConnection.java:380)
	... 8 more

We have set up the binary log for the source database according to the documentation.

What we have already found out is that it is probably because SSL is enforced for the airbyte user:

CREATE USER `airbyte`@`server-ip.com` IDENTIFIED WITH 'caching_sha2_password' AS '...' REQUIRE SSL PASSWORD EXPIRE DEFAULT ACCOUNT UNLOCK PASSWORD HISTORY DEFAULT PASSWORD REUSE INTERVAL DEFAULT PASSWORD REQUIRE CURRENT DEFAULT;

This doesn’t seem to be a problem for the normal queries, but the queries that are sent to check the binlogs don’t seem to use the SSL settings that are set:

We have temporarily removed the require ssl - option for the user. With this, the connection went through successfully.

However, this is not an option for us as the SSL requirement is part of our company’s security policy.

Is there a solution for this? Or are we possibly already doing something wrong in the configuration? Or is CDC with the SSL requirement simply not possible for the user?

I hope you can help me with this.

Thank you.

Hello there! You are receiving this message because none of your fellow community members has stepped in to respond to your topic post. (If you are a community member and you are reading this response, feel free to jump in if you have the answer!) As a result, the Community Assistance Team has been made aware of this topic and will be investigating and responding as quickly as possible.
Some important considerations that will help your to get your issue solved faster:

  • It is best to use our topic creation template; if you haven’t yet, we recommend posting a followup with the requested information. With that information the team will be able to more quickly search for similar issues with connectors and the platform and troubleshoot more quickly your specific question or problem.
  • Make sure to upload the complete log file; a common investigation roadblock is that sometimes the error for the issue happens well before the problem is surfaced to the user, and so having the tail of the log is less useful than having the whole log to scan through.
  • Be as descriptive and specific as possible; when investigating it is extremely valuable to know what steps were taken to encounter the issue, what version of connector / platform / Java / Python / docker / k8s was used, etc. The more context supplied, the quicker the investigation can start on your topic and the faster we can drive towards an answer.
  • We in the Community Assistance Team are glad you’ve made yourself part of our community, and we’ll do our best to answer your questions and resolve the problems as quickly as possible. Expect to hear from a specific team member as soon as possible.

Thank you for your time and attention.
Best,
The Community Assistance Team

Hey could you create a github issue on this, so that team can look into this problem.

Thank you for your answer. I have created an issue on github about it.