Hi, I’m working on adding the investment connector for plaid.
However, running into issues when following the transactions stream for plaid to implement the investment_transactions stream. From the logs, it looks like its failing due to the use of “date” as a cursor. See log below:
2022-10-12 22:23:57 normalization > {“type”: “LOG”, “log”: {“level”: “FATAL”, “message”: “‘date’\nTraceback (most recent call last):\n File "/usr/local/bin/transform-catalog", line 8, in \n sys.exit(main())\n File "/usr/local/lib/python3.9/site-packages/normalization/transform_catalog/transform.py", line 104, in main\n TransformCatalog().run(args)\n File "/usr/local/lib/python3.9/site-packages/normalization/transform_catalog/transform.py", line 36, in run\n self.process_catalog()\n File "/usr/local/lib/python3.9/site-packages/normalization/transform_catalog/transform.py", line 64, in process_catalog\n processor.process(catalog_file=catalog_file, json_column_name=json_col, default_schema=schema)\n File "/usr/local/lib/python3.9/site-packages/normalization/transform_catalog/catalog_processor.py", line 76, in process\n nested_processors = stream_processor.process()\n File "/usr/local/lib/python3.9/site-packages/normalization/transform_catalog/stream_processor.py", line 295, in process\n self.generate_scd_type_2_model(from_table, column_names),\n File "/usr/local/lib/python3.9/site-packages/normalization/transform_catalog/stream_processor.py", line 709, in generate_scd_type_2_model\n cursor_field = self.get_cursor_field(column_names)\n File "/usr/local/lib/python3.9/site-packages/normalization/transform_catalog/stream_processor.py", line 990, in get_cursor_field\n cursor = column_names[self.cursor_field[0]][0]\nKeyError: ‘date’”}}
2022-10-12 22:23:57 normalization > {“type”: “TRACE”, “trace”: {“type”: “ERROR”, “emitted_at”: 1665613437332.32, “error”: {“message”: “Something went wrong in the connector. See the logs for more details.”, “internal_message”: “‘date’”, “stack_trace”: “Traceback (most recent call last):\n File "/usr/local/bin/transform-catalog", line 8, in \n sys.exit(main())\n File "/usr/local/lib/python3.9/site-packages/normalization/transform_catalog/transform.py", line 104, in main\n TransformCatalog().run(args)\n File "/usr/local/lib/python3.9/site-packages/normalization/transform_catalog/transform.py", line 36, in run\n self.process_catalog()\n File "/usr/local/lib/python3.9/site-packages/normalization/transform_catalog/transform.py", line 64, in process_catalog\n processor.process(catalog_file=catalog_file, json_column_name=json_col, default_schema=schema)\n File "/usr/local/lib/python3.9/site-packages/normalization/transform_catalog/catalog_processor.py", line 76, in process\n nested_processors = stream_processor.process()\n File "/usr/local/lib/python3.9/site-packages/normalization/transform_catalog/stream_processor.py", line 295, in process\n self.generate_scd_type_2_model(from_table, column_names),\n File "/usr/local/lib/python3.9/site-packages/normalization/transform_catalog/stream_processor.py", line 709, in generate_scd_type_2_model\n cursor_field = self.get_cursor_field(column_names)\n File "/usr/local/lib/python3.9/site-packages/normalization/transform_catalog/stream_processor.py", line 990, in get_cursor_field\n cursor = column_names[self.cursor_field[0]][0]\nKeyError: ‘date’\n”, “failure_type”: “system_error”}}}
I think I’m going about this the wrong way as it doesnt look like the source defines anything that can be used as a cursor for investment transactions. But there is a webhook for updates in Plaid. I couldn’t get around how this can work with Airbyte’s pull (vs. push) mechanism. I would appreciate any pointer to implementing a webhook like that – is there a similar connector that I can look at?
Attached my source.py snipped for this class
Investment Transactions Stream