Summary
The user is encountering an issue in the Couchbase connector where the state message is not in the per-stream state format required for record counts.
Question
Copy text
State message was not in per-stream state format, which is required for record counts.```
```import logging
from datetime import datetime
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
AirbyteStreamState,
StreamDescriptor,
SyncMode,
Type,
)
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.core import CheckpointMixin
from .queries import get_documents_query, get_incremental_documents_query
logger = logging.getLogger("airbyte")
class CouchbaseStream(Stream, CheckpointMixin):
primary_key = [["id"]]
cursor_field = ["_ab_cdc_updated_at"]
def __init__(self, cluster, bucket: str, scope: str, collection: str, start_date: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
self.cluster = cluster
self.bucket = bucket
self.scope = scope
self.collection = collection
self.name = f"{bucket}.{scope}.{collection}"
self._start_date = start_date
self._state: MutableMapping[str, Any] = {}
def get_json_schema(self) -> Mapping[str, Any]:
return {
"$schema": "<http://json-schema.org/draft-07/schema#>",
"type": "object",
"properties": {
"id": {"type": "string"},
self.cursor_field[0]: {"type": "number"},
"content": {
"type": "object",
"additionalProperties": True
}
}
}
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
latest_benchmark = latest_record.get(self.cursor_field[0])
current_state = current_stream_state.get(self.cursor_field[0])
if latest_benchmark is not None:
if current_state is not None:
return {self.cursor_field[0]: max(latest_benchmark, current_state)}
return {self.cursor_field[0]: latest_benchmark}
return current_stream_state
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[AirbyteMessage]:
<http://logger.info|logger.info>(f"Starting read_records for {self.name} with sync_mode: {sync_mode}, cursor_field: {cursor_field}, stream_state: {stream_state}")
if sync_mode == SyncMode.full_refresh:
query = get_documents_query(self.bucket, self.scope, self.collection)
elif sync_mode == SyncMode.incremental:
cursor_value = stream_state.get(self.cursor_field[0], self._start_date) if stream_state else self._start_date
query = get_incremental_documents_query(self.bucket, self.scope, self.collection, cursor_value)
else:
raise ValueError(f"Unsupported sync mode {sync_mode}")
<http://logger.info|logger.info>(f"Executing query: {query}")
for row in self.cluster.query(query):
record = self._process_row(row)
self._state = self.get_updated_state(self._state, record)
yield AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(
stream=self.name,
data=record,
emitted_at=int(datetime.now().timestamp() * 1000)
)
)
# Emit state message
yield AirbyteMessage(
type=Type.STATE,
state=AirbyteStateMessage(
data=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name=self.name),
stream_state=self.get_stream_state()
)
)
)
<http://logger.info|logger.info>(f"Finished reading records for stream {self.name}")
<http://logger.info|logger.info>(f"Final state for stream {self.name}: {self._state}")
def _process_row(self, row: Mapping[str, Any]) -> Mapping[str, Any]:
return {
"id": row["_id"],
self.cursor_field[0]: row.get(self.cursor_field[0], 0),
"content": {k: v for k, v in row.items() if k not in ["_id", self.cursor_field[0]]}
}
@property
def state(self) -> MutableMapping[str, Any]:
return self._state
@state.setter
def state(self, value: MutableMapping[str, Any]):
self._state = value
def get_stream_state(self) -> Mapping[str, Any]:
return self._state```
<br>
---
This topic has been created from a Slack thread to give it more visibility.
It will be on Read-Only mode here. [Click here](https://airbytehq.slack.com/archives/C021JANJ6TY/p1728727679367859) if you want
to access the original thread.
[Join the conversation on Slack](https://slack.airbyte.com)
<sub>
["couchbase-connector", "state-message", "per-stream-state", "record-counts", "logging"]
</sub>