IncrementalMixin - State Object

I’m attempting to implement and incremental stream using an incremental class that inherits Stream and IncrementalMixin. My state resets every sync. Any advice for implementation?

Hey @ngille, do you mind sharing the actual code of your stream? If the state gets reset its probably related to the implementation of your state method:

@state.setter
def state(self, value):
    self._state[self.cursor_field] = value[self.cursor_field]
class IncrementalChargePointStream(Stream, IncrementalMixin, ABC):

    def __init__(self, config: Mapping[str, Any], state: Mapping[str, str]):
        self.client = zeep.Client(
                        config['url'],
                        wsse=UsernameToken(config['username'], config['password']))

        self._state = state

    def read_records(
        self,
        sync_mode: SyncMode,
        cursor_field: List[str] = None,
        stream_slice: Mapping[str, Any] = None,
        stream_state: Mapping[str, Any] = None,
    ) -> Iterable[Mapping[str, Any]]:
        
        client = self.client
        # Convert snake to camel case name
        tmp_name = self.name.split('_')
        stream = tmp_name[0] + ''.join(ele.title() for ele in tmp_name[1:])
        try:
            client_function = getattr(client.service, stream)
            sessionSearchQuery = self.state
            resp = client_function(sessionSearchQuery)
            # Response code of 100 signals success
            if resp['responseCode'] == '100':
                resp_data = getattr(resp, self.data_field)
                # Data must be valid json dictionary
                resp_dict = helpers.serialize_object(resp_data, dict)
                # Update state
                self.state = resp_data[-1]
                return resp_dict
            else:
                error_code = resp['responseCode']
                logger.info(f'{stream} failed to load data.')
                logger.info(f'Error code: {error_code}.')
        except Exception as e:
            logger.info(f'Could not call function {stream}')
            raise e
    
    @property
    @abstractmethod
    def cursor_field(self) -> str:
        """
        Defining a cursor field indicates that a stream is incremental, so any incremental stream must extend this class
        and define a cursor field.
        """
        pass

    @property
    @abstractmethod
    def data_field(self) -> str:
        """
        The responce entry that is jsonblob.
        For example - {'response': 100, 'time': datetime, 'data': {'record_1': ...}}
        The data field would be 'data'.
        """
        pass

    @property
    def state(self) -> Mapping[str, Any]:
        # State getter
        return self._state

    @state.setter
    def state(self, value: Mapping[str, Any]):
        self._state[self.cursor_field] = value[self.cursor_field]

I think your state gets reset because you set it in the __init__ of your IncrementalChargePointStream . Why did you chose to set it in __init__?
Moreover I see that the read_records sets self.state, I think this is not required and you should only leverage the state setter method to update the state.