Destination Redis: "JedisMovedDataException" when running Redshift to Redis connection

  • Is this your first time deploying Airbyte?: No
  • OS Version / Instance: Ubuntu / AWS EC2
  • Memory / Disk: m5.2xlarge - 8 CPU, 32 GB Memory
  • Deployment: Docker
  • Airbyte Version: 0.39.14-alpha
  • Source name/version: airbyte/source-redshift 0.3.10
  • Destination name/version: airbyte/destination-redis 0.1.2
  • Step:
    1. Configured the source (Redshift) and the destination (Redis). All running fine so far.
    2. Create the connection and sync it.
    3. Error when running the connection:
  • Description: I tried to read the documentation here, but it appears that it is incomplete:
  • Logs from the failed connection:
Logs

2022-07-01 16:58:09 e[44msourcee[0m > 2022-07-01 16:58:09 e[32mINFOe[m i.a.i.s.r.AbstractDbSource(lambda$read$2):126 - Closed database connection pool.
2022-07-01 16:58:09 e[44msourcee[0m > 2022-07-01 16:58:09 e[32mINFOe[m i.a.i.b.IntegrationRunner(runInternal):169 - Completed integration: io.airbyte.integrations.source.redshift.RedshiftSource
2022-07-01 16:58:09 e[44msourcee[0m > 2022-07-01 16:58:09 e[32mINFOe[m i.a.i.s.r.RedshiftSource(main):126 - completed source: class io.airbyte.integrations.source.redshift.RedshiftSource
2022-07-01 16:58:09 e[43mdestinatione[0m > 2022-07-01 16:58:09 e[1;31mERRORe[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(accept):52 - Exception while accepting message
2022-07-01 16:58:09 e[43mdestinatione[0m > redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 480 10.4.12.55:6379
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at redis.clients.jedis.Protocol.processError(Protocol.java:121) ~[jedis-3.7.0.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at redis.clients.jedis.Protocol.process(Protocol.java:173) ~[jedis-3.7.0.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at redis.clients.jedis.Protocol.read(Protocol.java:227) ~[jedis-3.7.0.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:352) ~[jedis-3.7.0.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at redis.clients.jedis.Connection.getIntegerReply(Connection.java:294) ~[jedis-3.7.0.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at redis.clients.jedis.Jedis.incr(Jedis.java:772) ~[jedis-3.7.0.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.destination.redis.RedisHCache.insert(RedisHCache.java:30) ~[io.airbyte.airbyte-integrations.connectors-destination-redis-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.destination.redis.RedisMessageConsumer.acceptTracked(RedisMessageConsumer.java:66) ~[io.airbyte.airbyte-integrations.connectors-destination-redis-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:50) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:194) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.IntegrationRunner.lambda$runConsumer$4(IntegrationRunner.java:203) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.IntegrationRunner.watchForOrphanThreads(IntegrationRunner.java:232) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.IntegrationRunner.runConsumer(IntegrationRunner.java:202) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.IntegrationRunner.lambda$runInternal$1(IntegrationRunner.java:165) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54) [io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:38) [io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:165) [io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:107) [io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.destination.redis.RedisDestination.main(RedisDestination.java:24) [io.airbyte.airbyte-integrations.connectors-destination-redis-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 2022-07-01 16:58:09 e[33mWARNe[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):63 - Airbyte message consumer: failed.
2022-07-01 16:58:09 e[43mdestinatione[0m > 2022-07-01 16:58:09 e[1;31mERRORe[m i.a.i.b.AirbyteExceptionHandler(uncaughtException):26 - Something went wrong in the connector. See the logs for more details.
2022-07-01 16:58:09 e[43mdestinatione[0m > redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 480 10.4.12.55:6379
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at redis.clients.jedis.Protocol.processError(Protocol.java:121) ~[jedis-3.7.0.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at redis.clients.jedis.Protocol.process(Protocol.java:173) ~[jedis-3.7.0.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at redis.clients.jedis.Protocol.read(Protocol.java:227) ~[jedis-3.7.0.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:352) ~[jedis-3.7.0.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at redis.clients.jedis.Connection.getIntegerReply(Connection.java:294) ~[jedis-3.7.0.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at redis.clients.jedis.Jedis.incr(Jedis.java:772) ~[jedis-3.7.0.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.destination.redis.RedisHCache.insert(RedisHCache.java:30) ~[io.airbyte.airbyte-integrations.connectors-destination-redis-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.destination.redis.RedisMessageConsumer.acceptTracked(RedisMessageConsumer.java:66) ~[io.airbyte.airbyte-integrations.connectors-destination-redis-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:50) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:194) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.IntegrationRunner.lambda$runConsumer$4(IntegrationRunner.java:203) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.IntegrationRunner.watchForOrphanThreads(IntegrationRunner.java:232) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.IntegrationRunner.runConsumer(IntegrationRunner.java:202) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.IntegrationRunner.lambda$runInternal$1(IntegrationRunner.java:165) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:38) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:165) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:107) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[43mdestinatione[0m > 	at io.airbyte.integrations.destination.redis.RedisDestination.main(RedisDestination.java:24) ~[io.airbyte.airbyte-integrations.connectors-destination-redis-0.39.20-alpha.jar:?]
2022-07-01 16:58:09 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$7):405 - State in DefaultReplicationWorker from destination: io.airbyte.protocol.models.AirbyteMessage@10e030b0[type=TRACE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=<null>,trace=io.airbyte.protocol.models.AirbyteTraceMessage@1e27ccdc[type=ERROR,emittedAt=1.656694689164E12,error=io.airbyte.protocol.models.AirbyteErrorTraceMessage@1a185bc0[message=Something went wrong in the connector. See the logs for more details.,internalMessage=redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 480 10.4.12.55:6379,stackTrace=redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 480 10.4.12.55:6379
	at redis.clients.jedis.Protocol.processError(Protocol.java:121)
	at redis.clients.jedis.Protocol.process(Protocol.java:173)
	at redis.clients.jedis.Protocol.read(Protocol.java:227)
	at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:352)
	at redis.clients.jedis.Connection.getIntegerReply(Connection.java:294)
	at redis.clients.jedis.Jedis.incr(Jedis.java:772)
	at io.airbyte.integrations.destination.redis.RedisHCache.insert(RedisHCache.java:30)
	at io.airbyte.integrations.destination.redis.RedisMessageConsumer.acceptTracked(RedisMessageConsumer.java:66)
	at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:50)
	at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:194)
	at io.airbyte.integrations.base.IntegrationRunner.lambda$runConsumer$4(IntegrationRunner.java:203)
	at io.airbyte.integrations.base.IntegrationRunner.watchForOrphanThreads(IntegrationRunner.java:232)
	at io.airbyte.integrations.base.IntegrationRunner.runConsumer(IntegrationRunner.java:202)
	at io.airbyte.integrations.base.IntegrationRunner.lambda$runInternal$1(IntegrationRunner.java:165)
	at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54)
	at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:38)
	at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:165)
	at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:107)
	at io.airbyte.integrations.destination.redis.RedisDestination.main(RedisDestination.java:24)
,failureType=system_error,additionalProperties={}],additionalProperties={}],additionalProperties={}]
2022-07-01 16:58:09 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):347 - Source has no more messages, closing connection.
2022-07-01 16:58:09 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):355 - Total records read: 23 (26 KB)
2022-07-01 16:58:09 e[1;31mERRORe[m i.a.w.g.DefaultReplicationWorker(run):180 - Sync worker failed.
java.util.concurrent.ExecutionException: io.airbyte.workers.general.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[?:?]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?]
	at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:173) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
	at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
	at java.lang.Thread.run(Thread.java:833) [?:?]
	Suppressed: java.io.IOException: Stream closed
		at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:445) ~[?:?]
		at java.io.OutputStream.write(OutputStream.java:162) ~[?:?]
		at java.io.BufferedOutputStream.write(BufferedOutputStream.java:123) ~[?:?]
		at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:234) ~[?:?]
		at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:304) ~[?:?]
		at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) ~[?:?]
		at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:132) ~[?:?]
		at java.io.OutputStreamWriter.write(OutputStreamWriter.java:205) ~[?:?]
		at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:120) ~[?:?]
		at java.io.BufferedWriter.flush(BufferedWriter.java:256) ~[?:?]
		at io.airbyte.workers.internal.DefaultAirbyteDestination.notifyEndOfInput(DefaultAirbyteDestination.java:98) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
		at io.airbyte.workers.internal.DefaultAirbyteDestination.close(DefaultAirbyteDestination.java:111) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
		at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:137) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
		at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
		at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
		at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: io.airbyte.workers.general.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1
	at io.airbyte.workers.general.DefaultReplicationWorker.lambda$getDestinationOutputRunnable$7(DefaultReplicationWorker.java:410) ~[io.airbyte-airbyte-workers-0.39.14-alpha.jar:?]
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	... 1 more
2022-07-01 16:58:09 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):239 - sync summary: io.airbyte.config.ReplicationAttemptSummary@d06f2ef[status=failed,recordsSynced=23,bytesSynced=26916,startTime=1656694685805,endTime=1656694689334,totalStats=io.airbyte.config.SyncStats@4ee2ff79[recordsEmitted=23,bytesEmitted=26916,stateMessagesEmitted=0,recordsCommitted=0],streamStats=[io.airbyte.config.StreamSyncStats@451bdb8b[streamName=active_order_vendors,stats=io.airbyte.config.SyncStats@7ef5abcc[recordsEmitted=23,bytesEmitted=26916,stateMessagesEmitted=<null>,recordsCommitted=<null>]]]]
2022-07-01 16:58:09 e[32mINFOe[m i.a.w.g.DefaultReplicationWorker(run):268 - Source did not output any state messages
2022-07-01 16:58:09 e[33mWARNe[m i.a.w.g.DefaultReplicationWorker(run):279 - State capture: No state retained.
2022-07-01 16:58:09 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(get):134 - Stopping cancellation check scheduling...
2022-07-01 16:58:09 e[32mINFOe[m i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$3):157 - sync summary: io.airbyte.config.StandardSyncOutput@2278878d[standardSyncSummary=io.airbyte.config.StandardSyncSummary@677a0da6[status=failed,recordsSynced=23,bytesSynced=26916,startTime=1656694685805,endTime=1656694689334,totalStats=io.airbyte.config.SyncStats@4ee2ff79[recordsEmitted=23,bytesEmitted=26916,stateMessagesEmitted=0,recordsCommitted=0],streamStats=[io.airbyte.config.StreamSyncStats@451bdb8b[streamName=active_order_vendors,stats=io.airbyte.config.SyncStats@7ef5abcc[recordsEmitted=23,bytesEmitted=26916,stateMessagesEmitted=<null>,recordsCommitted=<null>]]]],normalizationSummary=<null>,state=<null>,outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@4832e2b0[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@56576fc4[stream=io.airbyte.protocol.models.AirbyteStream@30f5de6c[name=active_order_vendors,jsonSchema={"type":"object","properties":{"id":{"type":"string"},"city":{"type":"string"},"state":{"type":"string"},"is_tpv":{"type":"boolean"},"country":{"type":"string"},"user_id":{"type":"string"},"zipcode":{"type":"string"},"currency":{"type":"string"},"order_id":{"type":"string"},"vendor_id":{"type":"string"},"created_at":{"type":"string"},"charged_tax":{"type":"number"},"location_id":{"type":"string"},"vendor_name":{"type":"string"},"fulfilled_by":{"type":"string"},"charged_total":{"type":"number"},"current_status":{"type":"string"},"fulfillment_tax":{"type":"number"},"organization_id":{"type":"string"},"charged_discount":{"type":"number"},"charged_shipping":{"type":"number"},"charged_subtotal":{"type":"number"},"fulfillment_type":{"type":"number"},"current_status_at":{"type":"string"},"fulfilled_by_type":{"type":"string"},"fulfillment_total":{"type":"number"},"linked_account_id":{"type":"string"},"organization_name":{"type":"string"},"charged_shipping_tax":{"type":"number"},"fulfillment_discount":{"type":"number"},"fulfillment_shipping":{"type":"number"},"fulfillment_subtotal":{"type":"number"},"vendor_owns_liability":{"type":"boolean"},"avalara_transaction_id":{"type":"string"},"is_processed_by_avalara":{"type":"boolean"},"linked_account_username":{"type":"string"},"dynamic_sourcing_revenue":{"type":"number"},"num_fulfillment_requests":{"type":"number"},"payment_rules_validation":{"type":"string"},"fulfillment_email_sent_to":{"type":"string"},"pricing_set_automatically":{"type":"boolean"},"early_pay_discount_revenue":{"type":"number"},"requested_for_fulfillment_by":{"type":"string"},"fulfillment_exception_present":{"type":"boolean"},"linked_account_owned_by_customer":{"type":"boolean"},"last_requested_for_fulfillment_at":{"type":"string"},"order_eligible_for_fulfillment_at":{"type":"string"},"first_requested_for_fulfillment_at":{"type":"string"},"fulfillment_exception_order_rejected_at":{"type":"string"},"requested_for_fulfillment_automatically":{"type":"boolean"},"fulfillment_exception_order_rejected_reason":{"type":"string"}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=<null>,defaultCursorField=[],sourceDefinedPrimaryKey=[],namespace=analytics_zone,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=overwrite,primaryKey=[],additionalProperties={}]],additionalProperties={}],failures=[io.airbyte.config.FailureReason@755f2e9[failureOrigin=destination,failureType=system_error,internalMessage=redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 480 10.4.12.55:6379,externalMessage=Something went wrong in the connector. See the logs for more details.,metadata=io.airbyte.config.Metadata@582da7d1[additionalProperties={attemptNumber=0, jobId=24419, from_trace_message=true}],stacktrace=redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 480 10.4.12.55:6379
	at redis.clients.jedis.Protocol.processError(Protocol.java:121)
	at redis.clients.jedis.Protocol.process(Protocol.java:173)
	at redis.clients.jedis.Protocol.read(Protocol.java:227)
	at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:352)
	at redis.clients.jedis.Connection.getIntegerReply(Connection.java:294)
	at redis.clients.jedis.Jedis.incr(Jedis.java:772)
	at io.airbyte.integrations.destination.redis.RedisHCache.insert(RedisHCache.java:30)
	at io.airbyte.integrations.destination.redis.RedisMessageConsumer.acceptTracked(RedisMessageConsumer.java:66)
	at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:50)
	at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:194)
	at io.airbyte.integrations.base.IntegrationRunner.lambda$runConsumer$4(IntegrationRunner.java:203)
	at io.airbyte.integrations.base.IntegrationRunner.watchForOrphanThreads(IntegrationRunner.java:232)
	at io.airbyte.integrations.base.IntegrationRunner.runConsumer(IntegrationRunner.java:202)
	at io.airbyte.integrations.base.IntegrationRunner.lambda$runInternal$1(IntegrationRunner.java:165)
	at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54)
	at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:38)
	at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:165)
	at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:107)
	at io.airbyte.integrations.destination.redis.RedisDestination.main(RedisDestination.java:24)
,retryable=<null>,timestamp=1656694689164], io.airbyte.config.FailureReason@3569da5f[failureOrigin=destination,failureType=<null>,internalMessage=io.airbyte.workers.general.DefaultReplicationWorker$DestinationException: Destination process end of stream notification failed,externalMessage=Something went wrong within the destination connector,metadata=io.airbyte.config.Metadata@2bf17420[additionalProperties={attemptNumber=0, jobId=24419}],stacktrace=java.util.concurrent.CompletionException: io.airbyte.workers.general.DefaultReplicationWorker$DestinationException: Destination process end of stream notification failed
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.airbyte.workers.general.DefaultReplicationWorker$DestinationException: Destination process end of stream notification failed
	at io.airbyte.workers.general.DefaultReplicationWorker.lambda$getReplicationRunnable$6(DefaultReplicationWorker.java:365)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
	... 3 more
Caused by: java.io.IOException: Stream closed
	at java.base/java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:445)
	at java.base/java.io.OutputStream.write(OutputStream.java:162)
	at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
	at java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142)
	at java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:320)
	at java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:160)
	at java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:248)
	at java.base/java.io.BufferedWriter.flush(BufferedWriter.java:257)
	at io.airbyte.workers.internal.DefaultAirbyteDestination.notifyEndOfInput(DefaultAirbyteDestination.java:98)
	at io.airbyte.workers.general.DefaultReplicationWorker.lambda$getReplicationRunnable$6(DefaultReplicationWorker.java:363)
	... 4 more
,retryable=<null>,timestamp=1656694689334], io.airbyte.config.FailureReason@31b090d[failureOrigin=destination,failureType=<null>,internalMessage=io.airbyte.workers.general.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1,externalMessage=Something went wrong within the destination connector,metadata=io.airbyte.config.Metadata@36bd3590[additionalProperties={attemptNumber=0, jobId=24419}],stacktrace=java.util.concurrent.CompletionException: io.airbyte.workers.general.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.airbyte.workers.general.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1
	at io.airbyte.workers.general.DefaultReplicationWorker.lambda$getDestinationOutputRunnable$7(DefaultReplicationWorker.java:410)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
	... 3 more
,retryable=<null>,timestamp=1656694689319]]]

Hi @israel.mendes welcome to the community and thanks for your post. Based on the logs, this seems to be an issue with the Redis destination. Could you describe how you configured your Redis instance/cluster? Also the logs indicate that there were 23 records synced. How many total records are you trying to sync to Redis from Redshift? Hope we can get this resolved soon!

Hey @sajarin

About your questions:

  • Could you describe how you configured your Redis instance/cluster?
    • I’m using a simple Redis to test the connection. The configuration: Redis in AWS MemoryDB, cluster with db.t4g.small node type. Security group open to all IPs, cluster located in a private subnet, the same used by Airbyte. When I configured the destination, the authentication works fine between Airbyte EC2 and Redis.
  • How many total records are you trying to sync to Redis from Redshift?
    • The table that I used in that connection only contains 23 records, which appear to be all in Redis. The problem seems that the connection fails for no clear reason.

Sorry the long delay Israel, I created one issue in Github https://github.com/airbytehq/airbyte/issues/15169 looks the destination is not closing the ingestion step. Any update I’ll return to you.

1 Like