I’m attempting to implement and incremental stream using an incremental class that inherits Stream
and IncrementalMixin
. My state resets every sync. Any advice for implementation?
Hey @ngille, do you mind sharing the actual code of your stream? If the state gets reset its probably related to the implementation of your state
method:
@state.setter
def state(self, value):
self._state[self.cursor_field] = value[self.cursor_field]
class IncrementalChargePointStream(Stream, IncrementalMixin, ABC):
def __init__(self, config: Mapping[str, Any], state: Mapping[str, str]):
self.client = zeep.Client(
config['url'],
wsse=UsernameToken(config['username'], config['password']))
self._state = state
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
client = self.client
# Convert snake to camel case name
tmp_name = self.name.split('_')
stream = tmp_name[0] + ''.join(ele.title() for ele in tmp_name[1:])
try:
client_function = getattr(client.service, stream)
sessionSearchQuery = self.state
resp = client_function(sessionSearchQuery)
# Response code of 100 signals success
if resp['responseCode'] == '100':
resp_data = getattr(resp, self.data_field)
# Data must be valid json dictionary
resp_dict = helpers.serialize_object(resp_data, dict)
# Update state
self.state = resp_data[-1]
return resp_dict
else:
error_code = resp['responseCode']
logger.info(f'{stream} failed to load data.')
logger.info(f'Error code: {error_code}.')
except Exception as e:
logger.info(f'Could not call function {stream}')
raise e
@property
@abstractmethod
def cursor_field(self) -> str:
"""
Defining a cursor field indicates that a stream is incremental, so any incremental stream must extend this class
and define a cursor field.
"""
pass
@property
@abstractmethod
def data_field(self) -> str:
"""
The responce entry that is jsonblob.
For example - {'response': 100, 'time': datetime, 'data': {'record_1': ...}}
The data field would be 'data'.
"""
pass
@property
def state(self) -> Mapping[str, Any]:
# State getter
return self._state
@state.setter
def state(self, value: Mapping[str, Any]):
self._state[self.cursor_field] = value[self.cursor_field]
I think your state gets reset because you set it in the __init__
of your IncrementalChargePointStream
. Why did you chose to set it in __init__
?
Moreover I see that the read_records
sets self.state
, I think this is not required and you should only leverage the state setter method to update the state.