Error in Couchbase connector with state message format

Summary

The user is encountering an issue in the Couchbase connector where the state message is not in the per-stream state format required for record counts.


Question

Copy text
State message was not in per-stream state format, which is required for record counts.```

```import logging
from datetime import datetime
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional

from airbyte_cdk.models import (
    AirbyteMessage,
    AirbyteRecordMessage,
    AirbyteStateMessage,
    AirbyteStreamState,
    StreamDescriptor,
    SyncMode,
    Type,
)
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.core import CheckpointMixin

from .queries import get_documents_query, get_incremental_documents_query

logger = logging.getLogger("airbyte")

class CouchbaseStream(Stream, CheckpointMixin):
    primary_key = [["id"]]
    cursor_field = ["_ab_cdc_updated_at"]

    def __init__(self, cluster, bucket: str, scope: str, collection: str, start_date: Optional[str] = None, **kwargs):
        super().__init__(**kwargs)
        self.cluster = cluster
        self.bucket = bucket
        self.scope = scope
        self.collection = collection
        self.name = f"{bucket}.{scope}.{collection}"
        self._start_date = start_date
        self._state: MutableMapping[str, Any] = {}

    def get_json_schema(self) -> Mapping[str, Any]:
        return {
            "$schema": "<http://json-schema.org/draft-07/schema#>",
            "type": "object",
            "properties": {
                "id": {"type": "string"},
                self.cursor_field[0]: {"type": "number"},
                "content": {
                    "type": "object",
                    "additionalProperties": True
                }
            }
        }

    def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -&gt; Mapping[str, Any]:
        latest_benchmark = latest_record.get(self.cursor_field[0])
        current_state = current_stream_state.get(self.cursor_field[0])
        
        if latest_benchmark is not None:
            if current_state is not None:
                return {self.cursor_field[0]: max(latest_benchmark, current_state)}
            return {self.cursor_field[0]: latest_benchmark}
        return current_stream_state

    def read_records(
        self,
        sync_mode: SyncMode,
        cursor_field: List[str] = None,
        stream_slice: Mapping[str, Any] = None,
        stream_state: Mapping[str, Any] = None,
    ) -&gt; Iterable[AirbyteMessage]:
        <http://logger.info|logger.info>(f"Starting read_records for {self.name} with sync_mode: {sync_mode}, cursor_field: {cursor_field}, stream_state: {stream_state}")
        
        if sync_mode == SyncMode.full_refresh:
            query = get_documents_query(self.bucket, self.scope, self.collection)
        elif sync_mode == SyncMode.incremental:
            cursor_value = stream_state.get(self.cursor_field[0], self._start_date) if stream_state else self._start_date
            query = get_incremental_documents_query(self.bucket, self.scope, self.collection, cursor_value)
        else:
            raise ValueError(f"Unsupported sync mode {sync_mode}")

        <http://logger.info|logger.info>(f"Executing query: {query}")
        
        for row in self.cluster.query(query):
            record = self._process_row(row)
            self._state = self.get_updated_state(self._state, record)
            yield AirbyteMessage(
                type=Type.RECORD,
                record=AirbyteRecordMessage(
                    stream=self.name,
                    data=record,
                    emitted_at=int(datetime.now().timestamp() * 1000)
                )
            )

            # Emit state message
            yield AirbyteMessage(
                type=Type.STATE,
                state=AirbyteStateMessage(
                    data=AirbyteStreamState(
                        stream_descriptor=StreamDescriptor(name=self.name),
                        stream_state=self.get_stream_state()
                    )
                )
            )

        <http://logger.info|logger.info>(f"Finished reading records for stream {self.name}")
        <http://logger.info|logger.info>(f"Final state for stream {self.name}: {self._state}")

    def _process_row(self, row: Mapping[str, Any]) -&gt; Mapping[str, Any]:
        return {
            "id": row["_id"],
            self.cursor_field[0]: row.get(self.cursor_field[0], 0),
            "content": {k: v for k, v in row.items() if k not in ["_id", self.cursor_field[0]]}
        }

    @property
    def state(self) -&gt; MutableMapping[str, Any]:
        return self._state

    @state.setter
    def state(self, value: MutableMapping[str, Any]):
        self._state = value

    def get_stream_state(self) -&gt; Mapping[str, Any]:
        return self._state```

<br>

---

This topic has been created from a Slack thread to give it more visibility.
It will be on Read-Only mode here. [Click here](https://airbytehq.slack.com/archives/C021JANJ6TY/p1728727679367859) if you want 
to access the original thread.

[Join the conversation on Slack](https://slack.airbyte.com)

<sub>
["couchbase-connector", "state-message", "per-stream-state", "record-counts", "logging"]
</sub>