Given a schema like so:
{
"$schema": "http://json-schema.org/draft-07/schema",
"name": "events",
"type": "object",
"properties": {
"id": {"type": "integer"},
"icon": {"type": "string"},
"file": {"type": "string"},
"item": {"type": "object"},
"location": {"type": "string"},
"created_at": {
"type": "object",
"properties": {
"datetime": {"type": "string"},
"formatted": {"type": "string"}
}
},
"updated_at": {
"type": "object",
"properties": {
"datetime": {"type": "string"},
"formatted": {"type": "string"}
}
},
"next_audit_date": {"type": "object"},
"days_to_next_audit": {"type": "integer"},
"action_type": {"type": "string"},
"admin": {"type": "object"}
}
}
and a cursor field of “updated_at/datetime”, the test_defined_cursors_exist_in_schema() test in the acceptance test suite is failing and doesn’t look like it would ever succeed in my case:
def test_defined_cursors_exist_in_schema(self, connector_config, discovered_catalog):
"""
Check if all of the source defined cursor fields are exists on stream's json schema.
"""
for stream_name, stream in discovered_catalog.items():
if stream.default_cursor_field:
schema = stream.json_schema
assert "properties" in schema, "Top level item should have an 'object' type for {stream_name} stream schema"
properties = schema["properties"] # <--right here
cursor_path = "/properties/".join(stream.default_cursor_field)
assert dpath.util.search(
properties, cursor_path
), f"Some of defined cursor fields {stream.default_cursor_field} are not specified in discover schema properties for {stream_name} stream"
As it’s written, the test will extract the top-level “properties” key, but “properties”
keys in nested schemas will remain as-is, causing the subsequent dpath.util.search()
call to fail because the dpath is going to be missing the 2nd “properties” .
Check out the following pdb postmortem:
(Pdb++) stream.default_cursor_field
['updated_at/datetime']
(Pdb++) cursor_path
'updated_at/datetime'
(Pdb++) dpath.util.search(properties, "/updated_at/datetime")
{}
(Pdb++) properties
{'id': {'type': 'integer'}, 'icon': {'type': 'string'}, 'file': {'type': 'string'}, 'item': {'type': 'object'}, 'location': {'type': 'string'}, 'created_at': {'type': 'object'}, 'updated_at': {'type': 'object', 'properties': {'datetime': {'type': 'string'}, 'formatted': {'type': 'string'}}}, 'next_audit_date': {'type': 'object'}, 'days_to_next_audit': {'type': 'integer'}, 'action_type': {'type': 'string'}, 'admin': {'type': 'object'}}
(Pdb++) dpath.util.search(properties,"/updated_at/properties/datetime")
{'updated_at': {'properties': {'datetime': {'type': 'string'}}}}
(Pdb++)
Assuming that the test is correct, how exactly are we supposed to specify the cursor field in an incremental stream?