Normalization process fails for the planetscale source and redshift destination

  • Is this your first time deploying Airbyte?: Yes
  • OS Version / Instance: Amazon Image 2 in EC2 Instance t3.large
  • Memory / Disk: 8GB
  • Deployment: Docker
  • Airbyte Version: 0.39.25-alpha
  • Source name/version: planetscale/airbyte-source:latest
  • Destination name/version: airbyte/destination-redshift:0.3.43
  • Step: The issue is happening during sync
  • Description: The temporary tables are created but when it comes to populating the data, the normalization process fails. Although I have not done any fancy normalization but just checked on the basic one offered by Airbyte instead of raw json. Here are the logs:
2022-06-27 12:38:29 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(lambda$getWorkerThread$2):161 - Completing future exceptionally...
io.airbyte.workers.exception.WorkerException: Normalization Failed.
	at io.airbyte.workers.general.DefaultNormalizationWorker.run(DefaultNormalizationWorker.java:63) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	at io.airbyte.workers.general.DefaultNormalizationWorker.run(DefaultNormalizationWorker.java:21) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: io.airbyte.workers.exception.WorkerException: Normalization Failed.
	at io.airbyte.workers.general.DefaultNormalizationWorker.run(DefaultNormalizationWorker.java:60) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	... 3 more
	Suppressed: io.airbyte.workers.exception.WorkerException: Normalization process wasn't successful
		at io.airbyte.workers.normalization.DefaultNormalizationRunner.close(DefaultNormalizationRunner.java:162) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
		at io.airbyte.workers.general.DefaultNormalizationWorker.run(DefaultNormalizationWorker.java:48) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
		at io.airbyte.workers.general.DefaultNormalizationWorker.run(DefaultNormalizationWorker.java:21) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
		at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
		at java.lang.Thread.run(Thread.java:833) [?:?]
2022-06-27 12:38:29 e[32mINFOe[m i.a.w.t.TemporalAttemptExecution(get):134 - Stopping cancellation check scheduling...
2022-06-27 12:38:29 e[32mINFOe[m i.a.w.t.TemporalUtils(withBackgroundHeartbeat):291 - Stopping temporal heartbeating...
2022-06-27 12:38:29 e[33mWARNe[m i.t.i.a.POJOActivityTaskHandler(activityFailureToResult):307 - Activity failure. ActivityId=c86a3223-b113-338e-8e55-b53a0d50181d, activityType=Normalize, attempt=1
java.lang.RuntimeException: io.temporal.serviceclient.CheckedExceptionWrapper: java.util.concurrent.ExecutionException: io.airbyte.workers.exception.WorkerException: Normalization Failed.
	at io.airbyte.workers.temporal.TemporalUtils.withBackgroundHeartbeat(TemporalUtils.java:289) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	at io.airbyte.workers.temporal.sync.NormalizationActivityImpl.normalize(NormalizationActivityImpl.java:75) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	at jdk.internal.reflect.GeneratedMethodAccessor446.invoke(Unknown Source) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
	at io.temporal.internal.activity.POJOActivityTaskHandler$POJOActivityInboundCallsInterceptor.execute(POJOActivityTaskHandler.java:214) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.activity.POJOActivityTaskHandler$POJOActivityImplementation.execute(POJOActivityTaskHandler.java:180) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.activity.POJOActivityTaskHandler.handle(POJOActivityTaskHandler.java:120) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:204) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:164) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93) ~[temporal-sdk-1.8.1.jar:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
	at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: io.temporal.serviceclient.CheckedExceptionWrapper: java.util.concurrent.ExecutionException: io.airbyte.workers.exception.WorkerException: Normalization Failed.
	at io.temporal.serviceclient.CheckedExceptionWrapper.wrap(CheckedExceptionWrapper.java:56) ~[temporal-serviceclient-1.8.1.jar:?]
	at io.temporal.internal.sync.WorkflowInternal.wrap(WorkflowInternal.java:448) ~[temporal-sdk-1.8.1.jar:?]
	at io.temporal.activity.Activity.wrap(Activity.java:51) ~[temporal-sdk-1.8.1.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.get(TemporalAttemptExecution.java:138) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	at io.airbyte.workers.temporal.sync.NormalizationActivityImpl.lambda$normalize$3(NormalizationActivityImpl.java:103) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalUtils.withBackgroundHeartbeat(TemporalUtils.java:284) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	... 13 more
Caused by: java.util.concurrent.ExecutionException: io.airbyte.workers.exception.WorkerException: Normalization Failed.
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[?:?]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.get(TemporalAttemptExecution.java:132) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	at io.airbyte.workers.temporal.sync.NormalizationActivityImpl.lambda$normalize$3(NormalizationActivityImpl.java:103) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalUtils.withBackgroundHeartbeat(TemporalUtils.java:284) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	... 13 more
Caused by: io.airbyte.workers.exception.WorkerException: Normalization Failed.
	at io.airbyte.workers.general.DefaultNormalizationWorker.run(DefaultNormalizationWorker.java:63) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	at io.airbyte.workers.general.DefaultNormalizationWorker.run(DefaultNormalizationWorker.java:21) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	... 1 more
Caused by: io.airbyte.workers.exception.WorkerException: Normalization Failed.
	at io.airbyte.workers.general.DefaultNormalizationWorker.run(DefaultNormalizationWorker.java:60) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	at io.airbyte.workers.general.DefaultNormalizationWorker.run(DefaultNormalizationWorker.java:21) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
	... 1 more
	Suppressed: io.airbyte.workers.exception.WorkerException: Normalization process wasn't successful
		at io.airbyte.workers.normalization.DefaultNormalizationRunner.close(DefaultNormalizationRunner.java:162) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
		at io.airbyte.workers.general.DefaultNormalizationWorker.run(DefaultNormalizationWorker.java:48) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
		at io.airbyte.workers.general.DefaultNormalizationWorker.run(DefaultNormalizationWorker.java:21) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
		at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.25-alpha.jar:?]
		at java.lang.Thread.run(Thread.java:833) [?:?]

Hey @leandrit1,
Could you please share the full sync logs and let us know if the raw tables are properly populated?

Hey @alafanechere , here are the full logs: And raw tables looks like that are not populated since when querying there are no records returned.

Hey @leandrit1,
I think the connector is having a problem reaching your PlanetScale instance:

2022-06-27 12:17:27 e[44msourcee[0m > ERROR: logging before flag.Parse: W0627 12:17:27.639859 7 component.go:41] [core] [Channel #1 SubChannel #2] grpc: addrConn.createTransport failed to connect to {2022-06-27 12:17:27 e[44msourcee[0m > "Addr": "e6wei8wfpdzx.eu-west-2.psdb.cloud:443",2022-06-27 12:17:27 e[44msourcee[0m > "ServerName": "e6wei8wfpdzx.eu-west-2.psdb.cloud:443",2022-06-27 12:17:27 e[44msourcee[0m > "Attributes": null,2022-06-27 12:17:27 e[44msourcee[0m > PlanetScale Source :: [teero:_BadgeToJob shard : -] no new rows found, exiting2022-06-27 12:17:27 e[44msourcee[0m > "BalancerAttributes": null,2022-06-27 12:17:27 e[44msourcee[0m > "Type": 0,2022-06-27 12:17:27 e[44msourcee[0m > "Metadata": null2022-06-27 12:17:27 e[44msourcee[0m > }. Err: connection error: desc = "transport: Error while dialing dial tcp 34.253.91.87:443: i/o timeout"2022-06-27 12:17:27 e[44msourcee[0m > PlanetScale Source :: [teero:_JobToUser shard : -] peeking to see if there's any new rows

This connector is not managed by Airbyte so I think you should reach the PlanetScale support or open an issue on this repo for additional help.

Apparently it was an issue on my end. After enabling the beta feature in PlanetScale, I should have created new credentials, which I did not… Now it is working. Thank you

Hi there from the Community Assistance team.
We’re letting you know about an issue we discovered with the back-end process we use to handle topics and responses on the forum. If you experienced a situation where you posted the last message in a topic that did not receive any further replies, please open a new topic to continue the discussion. In addition, if you’re having a problem and find a closed topic on the subject, go ahead and open a new topic on it and we’ll follow up with you. We apologize for the inconvenience, and appreciate your willingness to work with us to provide a supportive community.