Error in sync with 'Broken pipe' exception

Summary

The user is experiencing a ‘Broken pipe’ exception during a sync process in Airbyte, causing retries to start from the beginning. The user is seeking help to understand the issue with socat or pipes and potential reasons for this error.


Question

writeToDestination: exception caught` `java.net.SocketException: Broken pipe

Getting the following error in my sync and after this error it retries the sync although a lot of data was being transfer successfully earlier and in next retry it starts from initial stage.

java.net.SocketException: Broken pipe
	at sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:413) ~[?:?]
	at sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:433) ~[?:?]
	at sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:812) ~[?:?]
	at java.net.Socket$SocketOutputStream.write(Socket.java:1120) ~[?:?]
	at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:313) ~[?:?]
	at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:385) ~[?:?]
	at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:361) ~[?:?]
	at sun.nio.cs.StreamEncoder.lockedWrite(StreamEncoder.java:162) ~[?:?]
	at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:143) ~[?:?]
	at java.io.OutputStreamWriter.write(OutputStreamWriter.java:220) ~[?:?]
	at java.io.BufferedWriter.implFlushBuffer(BufferedWriter.java:178) ~[?:?]
	at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:163) ~[?:?]
	at java.io.BufferedWriter.implFlush(BufferedWriter.java:371) ~[?:?]
	at java.io.BufferedWriter.flush(BufferedWriter.java:359) ~[?:?]
	at io.airbyte.workers.internal.DefaultAirbyteMessageBufferedWriter.flush(DefaultAirbyteMessageBufferedWriter.java:31) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at io.airbyte.workers.internal.DefaultAirbyteDestination.notifyEndOfInputWithNoTimeoutMonitor(DefaultAirbyteDestination.java:133) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at io.airbyte.workers.internal.DefaultAirbyteDestination.notifyEndOfInput(DefaultAirbyteDestination.java:126) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at io.airbyte.workers.general.BufferedReplicationWorker.writeToDestination(BufferedReplicationWorker.java:463) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$3(BufferedReplicationWorker.java:253) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
	at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2024-07-25 14:24:32 [32mINFO[m i.a.w.g.BufferedReplicationWorker(writeToDestination):471 - writeToDestination: done. (forDest.isDone:false, isDestRunning:true)
2024-07-25 14:24:32 [32mINFO[m i.a.w.g.BufferedReplicationWorker(processMessage):439 - processMessage: done. (fromSource.isDone:false, forDest.isClosed:true)
2024-07-25 14:24:32 [32mINFO[m i.a.w.g.BufferedReplicationWorker(readFromDestination):504 - readFromDestination: exception caught
java.lang.IllegalStateException: Destination process is still alive, cannot retrieve exit value.
	at com.google.common.base.Preconditions.checkState(Preconditions.java:502) ~[guava-31.1-jre.jar:?]
	at io.airbyte.workers.internal.DefaultAirbyteDestination.getExitValue(DefaultAirbyteDestination.java:184) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at io.airbyte.workers.general.BufferedReplicationWorker.readFromDestination(BufferedReplicationWorker.java:497) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$3(BufferedReplicationWorker.java:253) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
	at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2024-07-25 14:24:32 [32mINFO[m i.a.w.g.BufferedReplicationWorker(readFromDestination):507 - readFromDestination: done. (writeToDestFailed:true, dest.isFinished:false)
2024-07-25 14:24:32 [32mINFO[m i.a.w.g.BufferedReplicationWorker(readFromSource):396 - readFromSource: exception caught
java.lang.IllegalStateException: Source process is still alive, cannot retrieve exit value.
	at com.google.common.base.Preconditions.checkState(Preconditions.java:502) ~[guava-31.1-jre.jar:?]
	at io.```
>  Airbyte Version: 0.50.33
> Deployed on: Kubernetes
> Data size: 1GB
> Source: File (S3)
> Destination: S3 (csv)
Logs of sync attempt are attached.

Can someone please help me out understanding the problem with socat or pipes and the potential reasons. Would be highly thankful. :pleading_face:

<br>

---

This topic has been created from a Slack thread to give it more visibility.
It will be on Read-Only mode here. [Click here](https://airbytehq.slack.com/archives/C021JANJ6TY/p1721927199158819) if you want 
to access the original thread.

[Join the conversation on Slack](https://slack.airbyte.com)

<sub>
["broken-pipe", "sync", "airbyte", "socat", "pipes", "retry", "error"]
</sub>

Not at a place to look at the logs, but something to check is if your pods are getting killed with an OOM (Out of Memory) signal. That won’t show in the Airbyte logs (because that logger got killed), but should show in your container logs (or platform logs that aggregate those)