- Is this your first time deploying Airbyte?: No
- OS Version / Instance: Centos 7
- Memory / Disk: 16 GB
- Deployment: Docker
- Airbyte Version: latest
- Source name/version: Postgres Latest
- Destination name/version: BigQuery Latest
- Step: The issue is happening during sync.
- Description: Sometimes 0 records are emitted from the source but still the sync shows as succeeded. Because of sync showing succeeded the connection also doesn’t retries the sync. I am sharing the logs where I found the issue.
2022-05-16 21:23:02 source > 2022-05-16 21:23:02 INFO i.a.i.s.r.AbstractDbSource(check):74 - Exception while checking connection:
2022-05-16 21:23:02 source > java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30001ms.
2022-05-16 21:23:02 source > at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696) ~[HikariCP-5.0.1.jar:?]
2022-05-16 21:23:02 source > at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:181) ~[HikariCP-5.0.1.jar:?]
2022-05-16 21:23:02 source > at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:146) ~[HikariCP-5.0.1.jar:?]
2022-05-16 21:23:02 source > at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:100) ~[HikariCP-5.0.1.jar:?]
2022-05-16 21:23:02 source > at io.airbyte.db.jdbc.DefaultJdbcDatabase.getMetaData(DefaultJdbcDatabase.java:76) ~[io.airbyte.airbyte-db-lib-0.38.3-alpha.jar:?]
2022-05-16 21:23:02 source > at io.airbyte.integrations.source.jdbc.AbstractJdbcSource.createDatabase(AbstractJdbcSource.java:316) ~[io.airbyte.airbyte-integrations.connectors-source-jdbc-0.38.3-alpha.jar:?]
2022-05-16 21:23:02 source > at io.airbyte.integrations.source.jdbc.AbstractJdbcSource.createDatabase(AbstractJdbcSource.java:67) ~[io.airbyte.airbyte-integrations.connectors-source-jdbc-0.38.3-alpha.jar:?]
2022-05-16 21:23:02 source > at io.airbyte.integrations.source.relationaldb.AbstractDbSource.createDatabaseInternal(AbstractDbSource.java:506) ~[io.airbyte.airbyte-integrations.connectors-source-relational-db-0.38.3-alpha.jar:?]
2022-05-16 21:23:02 source > at io.airbyte.integrations.source.relationaldb.AbstractDbSource.check(AbstractDbSource.java:67) [io.airbyte.airbyte-integrations.connectors-source-relational-db-0.38.3-alpha.jar:?]
2022-05-16 21:23:02 source > at io.airbyte.integrations.source.postgres.PostgresSource.read(PostgresSource.java:210) [io.airbyte.airbyte-integrations.connectors-source-postgres-0.38.3-alpha.jar:?]
2022-05-16 21:23:02 source > at io.airbyte.integrations.source.postgres.CustomPostgresSource.read(CustomPostgresSource.java:75) [io.airbyte.airbyte-integrations.connectors-source-custom-postgres-0.38.3-alpha.jar:?]
2022-05-16 21:23:02 source > at io.airbyte.integrations.base.ssh.SshWrappedSource.read(SshWrappedSource.java:54) [io.airbyte.airbyte-integrations.bases-base-java-0.38.3-alpha.jar:?]
2022-05-16 21:23:02 source > at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:155) [io.airbyte.airbyte-integrations.bases-base-java-0.38.3-alpha.jar:?]
2022-05-16 21:23:02 source > at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:107) [io.airbyte.airbyte-integrations.bases-base-java-0.38.3-alpha.jar:?]
2022-05-16 21:23:08 source > at io.airbyte.integrations.source.postgres.CustomPostgresSource.main(CustomPostgresSource.java:87) [io.airbyte.airbyte-integrations.connectors-source-custom-postgres-0.38.3-alpha.jar:?]
2022-05-16 21:23:08 source > 2022-05-16 21:23:08 INFO c.z.h.HikariDataSource(close):350 - HikariPool-1 - Shutdown initiated...
Also the postgres source that I am using is a custom one that I created by using CDK. This custom source just is a child class that inherits the postgres source given by airbyte and overrides some method. I am sharing the overridden methods below:
@Override
public AirbyteCatalog discover(final JsonNode config) throws Exception {
final AirbyteCatalog catalog = super.discover(config);
List<AirbyteStream> streams = catalog.getStreams();
for (AirbyteStream s : streams) {
final ObjectNode jsonSchema = (ObjectNode) s.getJsonSchema();
final ObjectNode properties = (ObjectNode) jsonSchema.get("properties");
final JsonNode stringType = Jsons.jsonNode(ImmutableMap.of("type", "string"));
properties.set("sourceID", stringType);
}
return catalog;
}
@Override
public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final JsonNode state)
throws Exception {
List<AirbyteMessage> list = AutoCloseableIterators.toListAndClose(super.read(config, catalog, state));
for (AirbyteMessage message : list) {
JsonNode node = message.getRecord().getData();
((ObjectNode)node).put("sourceID", config.get("sourceID").asText());
}
final Stream<AirbyteMessage> stream = list.stream();
return AutoCloseableIterators.fromStream(stream);
}