Summary
The user is experiencing a ‘Broken pipe’ exception during a sync process in Airbyte, causing retries to start from the beginning. The user is seeking help to understand the issue with socat or pipes and potential reasons for this error.
Question
writeToDestination: exception caught` `java.net.SocketException: Broken pipe
Getting the following error in my sync and after this error it retries the sync although a lot of data was being transfer successfully earlier and in next retry it starts from initial stage.
java.net.SocketException: Broken pipe
at sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:413) ~[?:?]
at sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:433) ~[?:?]
at sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:812) ~[?:?]
at java.net.Socket$SocketOutputStream.write(Socket.java:1120) ~[?:?]
at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:313) ~[?:?]
at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:385) ~[?:?]
at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:361) ~[?:?]
at sun.nio.cs.StreamEncoder.lockedWrite(StreamEncoder.java:162) ~[?:?]
at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:143) ~[?:?]
at java.io.OutputStreamWriter.write(OutputStreamWriter.java:220) ~[?:?]
at java.io.BufferedWriter.implFlushBuffer(BufferedWriter.java:178) ~[?:?]
at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:163) ~[?:?]
at java.io.BufferedWriter.implFlush(BufferedWriter.java:371) ~[?:?]
at java.io.BufferedWriter.flush(BufferedWriter.java:359) ~[?:?]
at io.airbyte.workers.internal.DefaultAirbyteMessageBufferedWriter.flush(DefaultAirbyteMessageBufferedWriter.java:31) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
at io.airbyte.workers.internal.DefaultAirbyteDestination.notifyEndOfInputWithNoTimeoutMonitor(DefaultAirbyteDestination.java:133) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
at io.airbyte.workers.internal.DefaultAirbyteDestination.notifyEndOfInput(DefaultAirbyteDestination.java:126) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.writeToDestination(BufferedReplicationWorker.java:463) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$3(BufferedReplicationWorker.java:253) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2024-07-25 14:24:32 [32mINFO[m i.a.w.g.BufferedReplicationWorker(writeToDestination):471 - writeToDestination: done. (forDest.isDone:false, isDestRunning:true)
2024-07-25 14:24:32 [32mINFO[m i.a.w.g.BufferedReplicationWorker(processMessage):439 - processMessage: done. (fromSource.isDone:false, forDest.isClosed:true)
2024-07-25 14:24:32 [32mINFO[m i.a.w.g.BufferedReplicationWorker(readFromDestination):504 - readFromDestination: exception caught
java.lang.IllegalStateException: Destination process is still alive, cannot retrieve exit value.
at com.google.common.base.Preconditions.checkState(Preconditions.java:502) ~[guava-31.1-jre.jar:?]
at io.airbyte.workers.internal.DefaultAirbyteDestination.getExitValue(DefaultAirbyteDestination.java:184) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.readFromDestination(BufferedReplicationWorker.java:497) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$3(BufferedReplicationWorker.java:253) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2024-07-25 14:24:32 [32mINFO[m i.a.w.g.BufferedReplicationWorker(readFromDestination):507 - readFromDestination: done. (writeToDestFailed:true, dest.isFinished:false)
2024-07-25 14:24:32 [32mINFO[m i.a.w.g.BufferedReplicationWorker(readFromSource):396 - readFromSource: exception caught
java.lang.IllegalStateException: Source process is still alive, cannot retrieve exit value.
at com.google.common.base.Preconditions.checkState(Preconditions.java:502) ~[guava-31.1-jre.jar:?]
at io.```
> Airbyte Version: 0.50.33
> Deployed on: Kubernetes
> Data size: 1GB
> Source: File (S3)
> Destination: S3 (csv)
Logs of sync attempt are attached.
Can someone please help me out understanding the problem with socat or pipes and the potential reasons. Would be highly thankful. :pleading_face:
<br>
---
This topic has been created from a Slack thread to give it more visibility.
It will be on Read-Only mode here. [Click here](https://airbytehq.slack.com/archives/C021JANJ6TY/p1721927199158819) if you want
to access the original thread.
[Join the conversation on Slack](https://slack.airbyte.com)
<sub>
["broken-pipe", "sync", "airbyte", "socat", "pipes", "retry", "error"]
</sub>