Summary
The user is experiencing an issue where the substream ‘openParent’ is not retrieving records from its parent stream ‘Campaigns’. The implementation involves creating a Python source for the Spotler API, where the substream is expected to make additional API calls based on the records obtained from the parent stream.
Question
Hello,
I am trying to create a python source with a stream and a sub stream that using the records obtained by its parent, makes other calls.
I am doing it for spotler, which, I need to make a call to get the campaigns and using the campaign ids, I make another call to campaign opens, to get information about the campaign.
This is what I have so far, I am not adding all the methods that I have to keep it simple to understand the problem
source.py
def streams(self, config: Mapping[str, Any]):
campaigns_stream = Campaigns(config)
parent = CampaignOpensParent(config, parent_stream=campaigns_stream)
return [campaigns_stream, parent]```
#campaigns stream (parent)
```class Campaigns(HttpStream, CheckpointMixin):
name = "Campaigns"
url_base = "<https://api.communigator.co.uk/mail/>"
primary_key = "id"
_cursor_value = None # Valor interno del cursor
state_checkpoint_interval = 1 # Guardar estado después de cada registro
state_file_path = "/data/shared_state/campaign_state.json"
def __init__(self, config):
super().__init__(authenticator=SpotlerAuthenticator(config))
@property
def use_cache(self) -> bool:
return True
def read_records(
self,
sync_mode: str,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
**kwargs
) -> Iterable[Mapping[str, Any]]:
for record in super().read_records(sync_mode=sync_mode, stream_slice=stream_slice, stream_state=stream_state,
**kwargs):
yield record```
# campaigns open ( child)
```class CampaignOpensParent(HttpSubStream):
name = "openParent"
url_base = "<https://api.communigator.co.uk/mail/>"
primary_key = ["campaign_id", "contact_id"]
# parent = Campaigns
def __init__(self, config, parent_stream: Campaigns):
super().__init__(parent=parent_stream)
self.config = config
self.landing_zone = config["landing_zone"]
self.page_size = 100
def stream_slices(
self,
sync_mode: SyncMode,
cursor_field: Optional[str] = None,
stream_state: Mapping[str, Any] = None
) -> Iterable[Mapping[str, Any]]:
for campaign in self.parent.read_records(sync_mode=SyncMode.full_refresh):
campaign_id = campaign["id"]
total_pages = self.get_total_pages(campaign_id)
if total_pages == 0:
<http://self.logger.info|self.logger.info>(f"No hay aperturas para la campaña {campaign_id}, omitiendo.")
continue
<http://self.logger.info|self.logger.info>(f"Procesando campaña {campaign_id} con {total_pages} páginas de aperturas.")
for page in range(1, total_pages + 1):
yield {"campaign_id": campaign_id, "page": page}```
In the response for the campaigns data I read records:
```{"type": "LOG", "log": {"level": "INFO", "message": "Read 4 records from Campaigns stream"}}```
but for the campaignsOpen I am not:
```{"type": "STATE", "state": {"type": "STREAM", "stream": {"stream_descriptor": {"name": "openParent", "namespace": null}, "stream_state": {"__ab_no_cursor_state_message": true}}, "sourceStats": {"recordCount": 0.0}}}
{"type": "LOG", "log": {"level": "INFO", "message": "Read 0 records from openParent stream"}}```
It seems that is not able to get the records from the father, I don’t understand why
<br>
---
This topic has been created from a Slack thread to give it more visibility.
It will be on Read-Only mode here. [Click here](https://airbytehq.slack.com/archives/C021JANJ6TY/p1731327044366239) if you want
to access the original thread.
[Join the conversation on Slack](https://slack.airbyte.com)
<sub>
['python-source', 'substream', 'parent-stream', 'spotler-api', 'record-retrieval']
</sub>