I’m attempting to implement and incremental stream using an incremental class that inherits Stream
and IncrementalMixin
. My state resets every sync. Any advice for implementation?
Hey @ngille, do you mind sharing the actual code of your stream? If the state gets reset its probably related to the implementation of your state
method:
@state.setter
def state(self, value):
self._state[self.cursor_field] = value[self.cursor_field]
class IncrementalChargePointStream(Stream, IncrementalMixin, ABC):
def __init__(self, config: Mapping[str, Any], state: Mapping[str, str]):
self.client = zeep.Client(
config['url'],
wsse=UsernameToken(config['username'], config['password']))
self._state = state
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
client = self.client
# Convert snake to camel case name
tmp_name = self.name.split('_')
stream = tmp_name[0] + ''.join(ele.title() for ele in tmp_name[1:])
try:
client_function = getattr(client.service, stream)
sessionSearchQuery = self.state
resp = client_function(sessionSearchQuery)
# Response code of 100 signals success
if resp['responseCode'] == '100':
resp_data = getattr(resp, self.data_field)
# Data must be valid json dictionary
resp_dict = helpers.serialize_object(resp_data, dict)
# Update state
self.state = resp_data[-1]
return resp_dict
else:
error_code = resp['responseCode']
logger.info(f'{stream} failed to load data.')
logger.info(f'Error code: {error_code}.')
except Exception as e:
logger.info(f'Could not call function {stream}')
raise e
@property
@abstractmethod
def cursor_field(self) -> str:
"""
Defining a cursor field indicates that a stream is incremental, so any incremental stream must extend this class
and define a cursor field.
"""
pass
@property
@abstractmethod
def data_field(self) -> str:
"""
The responce entry that is jsonblob.
For example - {'response': 100, 'time': datetime, 'data': {'record_1': ...}}
The data field would be 'data'.
"""
pass
@property
def state(self) -> Mapping[str, Any]:
# State getter
return self._state
@state.setter
def state(self, value: Mapping[str, Any]):
self._state[self.cursor_field] = value[self.cursor_field]
I think your state gets reset because you set it in the __init__
of your IncrementalChargePointStream
. Why did you chose to set it in __init__
?
Moreover I see that the read_records
sets self.state
, I think this is not required and you should only leverage the state setter method to update the state.
Hi there from the Community Assistance team.
We’re letting you know about an issue we discovered with the back-end process we use to handle topics and responses on the forum. If you experienced a situation where you posted the last message in a topic that did not receive any further replies, please open a new topic to continue the discussion. In addition, if you’re having a problem and find a closed topic on the subject, go ahead and open a new topic on it and we’ll follow up with you. We apologize for the inconvenience, and appreciate your willingness to work with us to provide a supportive community.