Source Development Integration Tests - The sync should produce at least one STATE message

I am working on getting my connector updated with passing integration tests, so it can be contributed back to source. Currently all of the tests are passing except test_state_with_abnormally_large_values. The assertion that a STATE message is produced is failing.

The code is working in production and when I run it locally state is produced and printed to console. I have tried modifying the state_checkpoint_interval, but that has not resolved it. Here is my get_updated_state method.

Any guidance on where I am going wrong?

Test Log:

self = <source_acceptance_test.tests.test_incremental.TestIncremental object at 0x10820b9a0>
connector_config = SecretDict(******)
configured_catalog = ConfiguredAirbyteCatalog(streams=[ConfiguredAirbyteStream(stream=AirbyteStream(name='cash_flows', json_schema={'type':...estination_sync_mode=<DestinationSyncMode.append: 'append'>, primary_key=None, source_defined_primary_key=[['uuid']])])
future_state = {'cash_flows': {'updateDateTime': '9999-05-04T18:31:18Z'}}
docker_runner = <source_acceptance_test.utils.connector_runner.ConnectorRunner object at 0x10820b340>

    def test_state_with_abnormally_large_values(self, connector_config, configured_catalog, future_state, docker_runner: ConnectorRunner):
        configured_catalog = incremental_only_catalog(configured_catalog)
        output = docker_runner.call_read_with_state(config=connector_config, catalog=configured_catalog, state=future_state)
        records = filter_output(output, type_=Type.RECORD)
        states = filter_output(output, type_=Type.STATE)

        assert not records, "The sync should produce no records when run with the state with abnormally large values"
>       assert states, "The sync should produce at least one STATE message"
E       AssertionError: The sync should produce at least one STATE message
E       assert []

../../bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py:112: AssertionError

 airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py::TestIncremental.test_state_with_abnormally_large_values[inputs0] ⨯100% ██████████
{"type": "LOG", "log": {"level": "INFO", "message": "/Users/willwatkinson/src/github.com/airbytehq/airbyte/airbyte-integrations/connectors/source-kyriba - SAT run - eba6694ac1b24cbf97a0ffba76036b452a8eec13 - FAILED"}}

Hey in the acceptance-test-config.yaml what is that state you have passed ? Ideally we will pass something like this future_state_path``:`` ``"``integration_tests/abnormal_state.json``"

Hi @harshith

Here is the abnormal state I have set up. For some reason it’s not allowing me to link to the file in Github.
{"cash_flows": {"updateDateTime": "9999-05-04T18:31:18Z"}}

Based on the test logs the query with the normal state is succeeding and no records are being returned as expected.

Hey can you pass this as state and run the sync python3 main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json --state <state file> and see what is the output ?

It looks like there are indeed no state messages. Based on my reading of the CDK this makes sense, though. It looks like STATE messages will only be produced if records are retrieved. It seems like the test either shouldn’t be checking for this, or the python cdk logic is wrong.

{"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourceKyriba"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: cash_flows "}}
{"type": "LOG", "log": {"level": "INFO", "message": "Read 0 records from cash_flows stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing cash_flows"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceKyriba runtimes:\nSyncing stream cash_flows 0:00:00.004844"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceKyriba"}}

I am wrong there is another point at which state will get produced. It requires there to be stream slices, though. In my stream slices method for this future state case, though, no stream slices will be produced because the start date is in the future :/.

Hey is it possible to share the repo for this source so that I can check and help

Hi,
Have you found any solution for this issue ? I cannot return any stream slices as well in case the date is in the future, as it would result in an error in the API call.

@CorentinBrtx here is the code I used to fix this. New Source: Kyriba by wjwatkinson · Pull Request #12748 · airbytehq/airbyte · GitHub

I set the end date to today and I set the start date to the end date if the start date is after the end date. This way I always have 1 stream slice for today that will produce a state message.

1 Like

Hi there from the Community Assistance team.
We’re letting you know about an issue we discovered with the back-end process we use to handle topics and responses on the forum. If you experienced a situation where you posted the last message in a topic that did not receive any further replies, please open a new topic to continue the discussion. In addition, if you’re having a problem and find a closed topic on the subject, go ahead and open a new topic on it and we’ll follow up with you. We apologize for the inconvenience, and appreciate your willingness to work with us to provide a supportive community.