How does sync mode work with sub stream / nested stream?

Hi, I’m currently extending the Pipedrive connector. Currently, it only syncs the deals, but I’m extending it to also sync the deal_flow (child entity, describing the changes on the deal - docs).

Basically what I did was adding following:

class PipedriveSubstream(PipedriveStream, HttpSubStream, ABC):

    def __init__(self, parent: PipedriveStream, authenticator, replication_start_date=None, **kwargs):
        super().__init__(parent=parent, authenticator=authenticator, replication_start_date=replication_start_date, **kwargs)

    
class DealFlow(PipedriveSubstream, ABC):
    """
    API docs: https://developers.pipedrive.com/docs/api/v1/Deals#getDealUpdates
    """

    def __init__(self, deals_stream: Deals, authenticator, replication_start_date=None, **kwargs):
        super().__init__(parent=deals_stream, authenticator=authenticator, replication_start_date=replication_start_date, **kwargs)

    def path(
        self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
    ) -> str:
        return f"deals/{stream_slice['parent']['id']}/flow"

See full diff on Github.

This miraculously works, but I was wondering the behavior of this depending on the sync mode. The base stream is here Deals.

  • What happens when Deals is set to Incremental? Would it only sync the new deals, and sync DealFlow of those Deals?
    • What happens when Deals is set to Full?
  • Or will it always sync all DealFlow by syncing all Deals (as DealFlow is set to Full)?

Hey @joelluijmes,
I’m glad to try to answer this interesting question!

This miraculously works, but I was wondering the behavior of this depending on the sync mode. The base stream is here Deals .

It works because your PipeDriveSubstream class inherits from HttpSubStream. The HttpSubStream class implements a stream_slices which creates a generator of all parent records. In other words your DealFlow re-reads all the records from the parent stream to generate paths. To avoid re-requesting the records from the parent stream you can set a use_cache = True attribute on your parent stream. This will store the already read records from the parent stream in a cache which is kept for the sync duration.

You can read more about nested streams in our documentation.
The implementation of HttpSubStream can be found here.

What happens when Deals is set to Full

The code sample you shared is already only working for a full refresh and the behavior is the one I explained above.

What happens when Deals is set to Incremental? Would it only sync the new deals, and sync DealFlow of those Deals ?

If you parent stream is set to incremental and your child stream is not, the child stream will run a full refresh of the parent stream because the HttpSubStream calls the parent stream with SyncMode.full_refresh here.

If you want you want parent and child stream to both be incremental they should share the same cursor field and you need to define your own stream_slices method on the child stream, reading the parent stream with SyncMode.incremental.

Did I answer your question as you expected?

Hi @alafanechere , thanks for the quick reply!

Yes I got most of your answer. So in theory, if i change it to following, it should work?

class PipedriveSubstream(PipedriveStream, HttpSubStream, ABC):

    def __init__(self, parent: PipedriveStream, authenticator, replication_start_date=None, **kwargs):
        super().__init__(parent=parent, authenticator=authenticator, replication_start_date=replication_start_date, **kwargs)

    
    def stream_slices(
        self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
    ) -> Iterable[Optional[Mapping[str, Any]]]:
        parent_stream_slices = self.parent.stream_slices(
            sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state
        )

        # iterate over parent stream_slices
        for stream_slice in parent_stream_slices:
            parent_records = self.parent.read_records(
                sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
            )

            # iterate over all parent records with current stream_slice
            for record in parent_records:
                yield {"parent": record}


    
class DealFlow(PipedriveSubstream, ABC):
    """
    API docs: https://developers.pipedrive.com/docs/api/v1/Deals#getDealUpdates
    """

    def __init__(self, deals_stream: Deals, authenticator, replication_start_date=None, **kwargs):
        super().__init__(parent=deals_stream, authenticator=authenticator, replication_start_date=replication_start_date, **kwargs)

    def path(
        self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
    ) -> str:
        return f"deals/{stream_slice['parent']['id']}/flow"

If it is this easy, I’m wondering why is the default behavior of the stream_slices to set sync_mode = Full instead of passing it through?

Yes, it should work like this. I think the default behavior is for safety, in case the substream is not incremental.

Hi there from the Community Assistance team.
We’re letting you know about an issue we discovered with the back-end process we use to handle topics and responses on the forum. If you experienced a situation where you posted the last message in a topic that did not receive any further replies, please open a new topic to continue the discussion. In addition, if you’re having a problem and find a closed topic on the subject, go ahead and open a new topic on it and we’ll follow up with you. We apologize for the inconvenience, and appreciate your willingness to work with us to provide a supportive community.