IncrementalMixin - State Object

I’m attempting to implement and incremental stream using an incremental class that inherits Stream and IncrementalMixin. My state resets every sync. Any advice for implementation?

Hey @ngille, do you mind sharing the actual code of your stream? If the state gets reset its probably related to the implementation of your state method:

@state.setter
def state(self, value):
    self._state[self.cursor_field] = value[self.cursor_field]
class IncrementalChargePointStream(Stream, IncrementalMixin, ABC):

    def __init__(self, config: Mapping[str, Any], state: Mapping[str, str]):
        self.client = zeep.Client(
                        config['url'],
                        wsse=UsernameToken(config['username'], config['password']))

        self._state = state

    def read_records(
        self,
        sync_mode: SyncMode,
        cursor_field: List[str] = None,
        stream_slice: Mapping[str, Any] = None,
        stream_state: Mapping[str, Any] = None,
    ) -> Iterable[Mapping[str, Any]]:
        
        client = self.client
        # Convert snake to camel case name
        tmp_name = self.name.split('_')
        stream = tmp_name[0] + ''.join(ele.title() for ele in tmp_name[1:])
        try:
            client_function = getattr(client.service, stream)
            sessionSearchQuery = self.state
            resp = client_function(sessionSearchQuery)
            # Response code of 100 signals success
            if resp['responseCode'] == '100':
                resp_data = getattr(resp, self.data_field)
                # Data must be valid json dictionary
                resp_dict = helpers.serialize_object(resp_data, dict)
                # Update state
                self.state = resp_data[-1]
                return resp_dict
            else:
                error_code = resp['responseCode']
                logger.info(f'{stream} failed to load data.')
                logger.info(f'Error code: {error_code}.')
        except Exception as e:
            logger.info(f'Could not call function {stream}')
            raise e
    
    @property
    @abstractmethod
    def cursor_field(self) -> str:
        """
        Defining a cursor field indicates that a stream is incremental, so any incremental stream must extend this class
        and define a cursor field.
        """
        pass

    @property
    @abstractmethod
    def data_field(self) -> str:
        """
        The responce entry that is jsonblob.
        For example - {'response': 100, 'time': datetime, 'data': {'record_1': ...}}
        The data field would be 'data'.
        """
        pass

    @property
    def state(self) -> Mapping[str, Any]:
        # State getter
        return self._state

    @state.setter
    def state(self, value: Mapping[str, Any]):
        self._state[self.cursor_field] = value[self.cursor_field]

I think your state gets reset because you set it in the __init__ of your IncrementalChargePointStream . Why did you chose to set it in __init__?
Moreover I see that the read_records sets self.state, I think this is not required and you should only leverage the state setter method to update the state.

Hi there from the Community Assistance team.
We’re letting you know about an issue we discovered with the back-end process we use to handle topics and responses on the forum. If you experienced a situation where you posted the last message in a topic that did not receive any further replies, please open a new topic to continue the discussion. In addition, if you’re having a problem and find a closed topic on the subject, go ahead and open a new topic on it and we’ll follow up with you. We apologize for the inconvenience, and appreciate your willingness to work with us to provide a supportive community.