Summary
User is encountering an error while attempting to log records retrieved from a MariaDB source in a custom connector implementation. The provided code snippet shows the write method and logging attempts.
Question
Hello, I need your help. I’m trying to create my own connector, and at the moment, I just want to log the records I retrieve from the source (MariaDB), but I am getting the following error:
and this what i did so far in my destination :
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
logger = logging.getLogger(__name__)
last_state = None
processed_records = 0
for message in input_messages:
<http://logger.info|logger.info>("------------------------------------------------------")
if message.type == Type.RECORD:
record = message.record
#<http://logger.info|logger.info>(f"Processing record: {record}")
try:
self.write_to_destination(self, record)
processed_records += 1
except Exception as e:
logger.error(f"Failed to write record: {e}")
elif message.type == Type.STATE:
<http://logger.info|logger.info>(f"Processing state message: {message.state}")
last_state = message.state
if last_state:
<http://logger.info|logger.info>(f"Emitting state: {last_state}")
yield AirbyteMessage(type=Type.STATE, state=last_state)
@staticmethod
def write_to_destination(self, record: AirbyteRecordMessage):
logger = logging.getLogger(__name__)
<http://logger.info|logger.info>(f"Persisting record: {record}")
pass```
<br>
---
This topic has been created from a Slack thread to give it more visibility.
It will be on Read-Only mode here. [Click here](https://airbytehq.slack.com/archives/C021JANJ6TY/p1731319897141379) if you want
to access the original thread.
[Join the conversation on Slack](https://slack.airbyte.com)
<sub>
['custom-connector', 'mariadb', 'logging', 'airbyte-message']
</sub>