I have used HttpSubStream to get id’s from parent stream in the child stream. The problem is when I am receiving the data the dates are in decreasing order. So if i want to implement Incremental-Mixin method how would I implement it because the state would never pick that up.
If somebody helps me out put me the right direction it would be real helpful.
I don’t think there is any issue in code as it is working fine. Problem lies in data which I am fetching from url. Received data have dates in decreasing order. Is there any solution which we can follow.
I see! I’m still brainstorming/researching on how best you could do this, but perhaps a good starting point would be the parse_response function? Maybe you could sort the data there in reverse order?
FYI this is how Meltano deals with “unsorted” streams - I’m looking for a solution to this in Airbyte as well after moving from Meltano for other reasons:
@Zawar92 FWIW I found a pretty good implementation that’s about 2 weeks old that does just this. I’m folding this into my connector now.
class SendgridStreamIncrementalMixin(HttpStream, ABC):
cursor_field = "created"
def __init__(self, start_time: int, **kwargs):
super().__init__(**kwargs)
self._start_time = start_time
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object
and returning an updated state object.
"""
latest_benchmark = latest_record[self.cursor_field]
if current_stream_state.get(self.cursor_field):
return {self.cursor_field: max(latest_benchmark, current_stream_state[self.cursor_field])}
return {self.cursor_field: latest_benchmark}
def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state)
start_time = self._start_time
if stream_state.get(self.cursor_field):
start_time = stream_state[self.cursor_field]
params.update({"start_time": start_time, "end_time": pendulum.now().int_timestamp})
return params
I think you just need to install the pendulum dependency and then replace the cursor_field value to whatever field indicates the recency of the record you’re using in your state.
Hi @pjatx. You are using created as cursor_field as it checks for the recency of the record. In my case I actually don’t have anything to check for the recency except for date in the record streams. But the problem is dates object in streams is not in incremental order they are in descending order.
The above example you gave. I already tried implementing it but there isn’t any field checking recency of record except for dates.