Thank you for the code sample.
I would suggest that you perform your authentication in a custom authenticator class that inherits from HttpAuthenticator
. I wrote an example for you:
class CookieAuthenticator(HttpAuthenticator):
def __init__(self, config):
self.cookie_jar = self.login(config["api_key"], config["username"], config["pasword"])
def login(self, api_key, username, password):
obfuscated = self.obfuscateApiKey(api_key)
login_body = {
"username": username,
"password": password,
"apiKey": obfuscated["obfuscated_key"],
"timestamp": obfuscated["timestamp"]
}
login_url = urljoin(self.url_base, "authenticatedSession")
resp = requests.post(url=login_url, json=login_body)
resp.raise_for_status()
return resp.cookies
def get_auth_header(self) -> Mapping[str, Any]:
return {"cookie": "; ".join([f"{k}={v}" for k,v in requests.dict_from_cookiejar(self.cookie_jar)])}
You will then pass the authenticator to your CustomApiStream
in the init, such as:
cookie_autenticator = CookieAuthenticator(config)
custom_api_stream = CustomApiStream(config, authenticator=cookie_authenticator)
This is a cleaner approach in my opinion, by separating concerns and following our CDK practices.