/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.repackaged.runners.core.fn;

import java.util.concurrent.ExecutionException;
import org.apache.beam.runners.direct.repackaged.javax.annotation.Nullable;
import org.apache.beam.runners.direct.repackaged.runners.core.DoFnRunner;
import org.apache.beam.runners.direct.repackaged.runners.core.fn.SdkHarnessClient;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.joda.time.Instant;

public class SdkHarnessDoFnRunner<InputT, OutputT>
implements DoFnRunner<InputT, OutputT> {
    private final SdkHarnessClient sdkHarnessClient;
    private final String processBundleDescriptorId;
    @Nullable
    private SdkHarnessClient.ActiveBundle activeBundle;

    private SdkHarnessDoFnRunner(SdkHarnessClient sdkHarnessClient, String processBundleDescriptorId) {
        this.sdkHarnessClient = sdkHarnessClient;
        this.processBundleDescriptorId = processBundleDescriptorId;
    }

    public static <InputT, OutputT> SdkHarnessDoFnRunner<InputT, OutputT> create(SdkHarnessClient sdkHarnessClient, String processBundleDescriptorId) {
        return new SdkHarnessDoFnRunner<InputT, OutputT>(sdkHarnessClient, processBundleDescriptorId);
    }

    @Override
    public void startBundle() {
        this.activeBundle = this.sdkHarnessClient.newBundle(this.processBundleDescriptorId);
    }

    @Override
    public void processElement(WindowedValue<InputT> elem) {
        Preconditions.checkState(this.activeBundle != null, "%s attempted to process an element without an active bundle", (Object)SdkHarnessDoFnRunner.class.getSimpleName());
        try {
            this.activeBundle.getInputReceiver().accept(elem);
        }
        catch (Exception exc) {
            throw new RuntimeException(exc);
        }
    }

    @Override
    public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
        throw new UnsupportedOperationException("Timers are not supported over the Fn API");
    }

    @Override
    public void finishBundle() {
        try {
            this.activeBundle.getBundleResponse().get();
        }
        catch (InterruptedException interrupted) {
            Thread.interrupted();
            return;
        }
        catch (ExecutionException exc) {
            throw UserCodeException.wrap((Throwable)exc);
        }
    }
}

