Incremental stream implementation

Hello everyone, I have implemented a connector using Xola API. I made one DateTime column as cursor_field and implemented get_updated_state function, It seems to be not working. can anyone please tell what add more:

class Orders(XolaStream):
    primary_key = "order_id"
    cursor_field = None
    cursor_field_value = None
    seller_id = None

    def __init__(self, seller_id: str, x_api_key: str, cursor_field: str, cursor_field_value: str, **kwargs):
        super().__init__(x_api_key, **kwargs)
        self.seller_id = seller_id
        if cursor_field:
            self.cursor_field = cursor_field
            self.cursor_field_value = cursor_field_value

    def path(
            self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
            next_page_token: Mapping[str, Any] = None
    ) -> str:
        """
        should return "orders". Required.
        """
        return "orders"

    def request_params(
            self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None,
            next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        """
        seller id is returned as a form of parameters
        """
        params = {}
        if next_page_token:
            for key in next_page_token.keys():
                params[key] = next_page_token[key]
        params['seller'] = self.seller_id
        return params

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        """
        TODO: Override this method to define how a response is parsed.
        :return an iterable containing each record in the response
        """
        raw_response = response.json()["data"]
        modified_response = []
        for data in raw_response:
            try:
                # Tags._id
                resp = {"tags": []}
                .......
                resp["order_id"] = data["id"]

                if "createdAt" in data.keys(): resp["createdAt"] = data["createdAt"]
                if "customerName" in data.keys(): resp["customerName"] = data["customerName"]
                .......
                if "createdBy" in data.keys(): resp["createdBy"] = data["createdBy"]
                .......
                if "updatedAt" in data.keys(): resp["updatedAt"] = data["updatedAt"]
                modified_response.append(resp)
            except:
                pass
        return modified_response

    def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> \
            Mapping[str, Any]:
        """
        Override to determine the latest state after reading the latest record. This typically compared the cursor_field from the latest record and
        the current state and picks the 'most' recent cursor. This is how a stream's state is determined. Required for incremental.
        """
        if current_stream_state is not None and self.cursor_field in current_stream_state:
            current_parsed_date = current_stream_state[self.cursor_field]
            latest_record_date = latest_record[self.cursor_field]
            return {self.cursor_field: max(current_parsed_date, latest_record_date)}
        else:
            return {self.cursor_field: self.cursor_field_value}```






here in config file I pass `updatedAt` as `cursor_field` and a DateTime value string as `cursor_field_value`. I am expecting to get records whose DateTime greater than that of cursor_field_value to be returned

Hello @avijit,
I suggest first to get your cursor_field to be an hardcoded class attribute instead of a dynamic instance attribute that you set in __init__. The cursor_field should never change.
I would also suggest to rename your self.cursor_field_value to something like start_cursor_field_value that is defined is never mutated after instantiation and on which your get_updated_state method falls back is no cursor is available in the state (this is what you already did but this is a naming suggestion from my side).

We recently simplified a bit the implementation of an incremental stream and we deprecated the get_updated_state. Please use IncrementalMixin now: Incremental Streams | Airbyte Documentation.

It would help if you could share the current error you have and which is the cursor field used in your Orders stream. Feel free to link a PR to this new connector, it will be easier for us to review. Do you plan to contribute this connector to Airbyte?

Hi there from the Community Assistance team.
We’re letting you know about an issue we discovered with the back-end process we use to handle topics and responses on the forum. If you experienced a situation where you posted the last message in a topic that did not receive any further replies, please open a new topic to continue the discussion. In addition, if you’re having a problem and find a closed topic on the subject, go ahead and open a new topic on it and we’ll follow up with you. We apologize for the inconvenience, and appreciate your willingness to work with us to provide a supportive community.