Use stream output for another stream input

I have a case for a database based (non-HTTP) connector I’m developing where I need to filter results for one table/stream based on the values output in a table extracted before it. This case is handled in HTTP connectors via the “use_cache”/use of the VCR package to cache requests, but it also seems specific to requests and maybe not like it should be used for a DB connector.
In my case, I technically only need the minimum and maximum values, so I initially wrote it as a state value, but I can’t find a good way to pass one stream’s state to another stream, especially the updated state for the current run.
I feel like maybe something simple like a temporary file could be used, but I did see We can't use NamedTemporaryFile here because yaml serializer doesn't work well with empty files in the airbyte_cdk where VCR’s cassette is used, so I’m hesitant to go that route.
Curious if anyone has seen this problem arise for a DB connector and if there’s an existing solution out there, or any ideas on what might or might not work well?

Any thoughts here? Looks like Arthur Galuza was the dev on the HTTP caching/sharing records across streams and might have some input.

Hi @luke, thanks for your post. Currently, I don’t believe there’s an easy way to do this with Airbyte hence your question/post. I don’t know of any existing solutions out there but it seems like your best bet would to look into some of the existing python libraries out there that handle caching and implement the use_cache feature for the connector you’re developing.

Out of curiosity, are you willing to share what connector you’re developing? I don’t think it’s materially relevant but it might be useful to know if we want to implement this feature for other db connectors.

OK, thanks, I’ll look into some caching strategies. This is for a Netsuite (ODBC) connector.

Hey @luke, just to answer your earlier question: unfortunately you can’t run connectors using docker-compose. We use docker-compose for some of our more long-running components like server, temporal, scheduler, but the processes relating to connectors are designed not to live as long. Hope this helps!

1 Like

Just to update anyone curious here, I ended up using memcached to do this. To get around not being able to docker-compose/have a seperate container for memcached, I changed the entrypoint file that dockerfile uses, to firstly spin up memcached, and then execute the regular airbyte process. Then in the process I split state into intra (standard state persisting between syncs but not between streams) and inter (cached state persisting within the same sync between streams, but not between syncs).
Everything’s working fine with this approach; however, if airbyte ever moves to a world of parallelism within syncs (vs just between jobs as exists now), then this solution wouldn’t work (unless there was some way to mark syncs as “dependent” so some run in serial).

Thanks for the circling back and sharing how you worked around the docker issue @luke!