/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.runners.core;

import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.direct_java.runners.core.OutputWindowedValue;
import org.apache.beam.repackaged.direct_java.runners.core.SideInputReader;
import org.apache.beam.repackaged.direct_java.runners.core.SplittableProcessElementInvoker;
import org.apache.beam.repackaged.direct_java.sdk.fn.splittabledofn.RestrictionTrackers;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnOutputReceivers;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class OutputAndTimeBoundedSplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT>
extends SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT> {
    private final DoFn<InputT, OutputT> fn;
    private final PipelineOptions pipelineOptions;
    private final OutputWindowedValue<OutputT> output;
    private final SideInputReader sideInputReader;
    private final ScheduledExecutorService executor;
    private final int maxNumOutputs;
    private final Duration maxDuration;

    public OutputAndTimeBoundedSplittableProcessElementInvoker(DoFn<InputT, OutputT> fn, PipelineOptions pipelineOptions, OutputWindowedValue<OutputT> output, SideInputReader sideInputReader, ScheduledExecutorService executor, int maxNumOutputs, Duration maxDuration) {
        this.fn = fn;
        this.pipelineOptions = pipelineOptions;
        this.output = output;
        this.sideInputReader = sideInputReader;
        this.executor = executor;
        this.maxNumOutputs = maxNumOutputs;
        this.maxDuration = maxDuration;
    }

    @Override
    public SplittableProcessElementInvoker.Result invokeProcessElement(DoFnInvoker<InputT, OutputT> invoker, WindowedValue<InputT> element, RestrictionTracker<RestrictionT, PositionT> tracker) {
        final ProcessContext processContext = new ProcessContext(element, tracker);
        DoFn.ProcessContinuation cont = invoker.invokeProcessElement(new DoFnInvoker.ArgumentProvider<InputT, OutputT>(){

            public DoFn.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
                return processContext;
            }

            public InputT element(DoFn<InputT, OutputT> doFn) {
                return processContext.element();
            }

            public Object sideInput(String tagId) {
                throw new UnsupportedOperationException("Not supported in SplittableDoFn");
            }

            public Object schemaElement(int index) {
                throw new UnsupportedOperationException("Not supported in SplittableDoFn");
            }

            public Instant timestamp(DoFn<InputT, OutputT> doFn) {
                return processContext.timestamp();
            }

            public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
                throw new UnsupportedOperationException("Access to time domain not supported in ProcessElement");
            }

            public DoFn.OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
                return DoFnOutputReceivers.windowedReceiver((DoFn.WindowedContext)processContext, null);
            }

            public DoFn.OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
                throw new UnsupportedOperationException("Not supported in SplittableDoFn");
            }

            public DoFn.MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
                return DoFnOutputReceivers.windowedMultiReceiver((DoFn.WindowedContext)processContext, null);
            }

            public RestrictionTracker<?, ?> restrictionTracker() {
                return processContext.tracker;
            }

            public BoundedWindow window() {
                throw new UnsupportedOperationException("Access to window of the element not supported in Splittable DoFn");
            }

            public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
                throw new UnsupportedOperationException("Access to pane of the element not supported in Splittable DoFn");
            }

            public PipelineOptions pipelineOptions() {
                return OutputAndTimeBoundedSplittableProcessElementInvoker.this.pipelineOptions;
            }

            public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
                throw new IllegalStateException("Should not access startBundleContext() from @" + DoFn.ProcessElement.class.getSimpleName());
            }

            public DoFn.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
                throw new IllegalStateException("Should not access finishBundleContext() from @" + DoFn.ProcessElement.class.getSimpleName());
            }

            public DoFn.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
                throw new UnsupportedOperationException("Access to timers not supported in Splittable DoFn");
            }

            public State state(String stateId) {
                throw new UnsupportedOperationException("Access to state not supported in Splittable DoFn");
            }

            public Timer timer(String timerId) {
                throw new UnsupportedOperationException("Access to timers not supported in Splittable DoFn");
            }
        });
        processContext.cancelScheduledCheckpoint();
        KV residual = processContext.getTakenCheckpoint();
        if (cont.shouldResume()) {
            Preconditions.checkState((!processContext.hasClaimFailed ? 1 : 0) != 0, (Object)"After tryClaim() returned false, @ProcessElement must return stop(), but returned resume()");
            if (residual == null) {
                if (processContext.numClaimedBlocks > 0) {
                    residual = (KV)Preconditions.checkNotNull(processContext.takeCheckpointNow());
                    processContext.tracker.checkDone();
                } else {
                    residual = KV.of((Object)tracker.currentRestriction(), (Object)processContext.getLastReportedWatermark());
                }
            } else {
                processContext.tracker.checkDone();
            }
        } else {
            processContext.tracker.checkDone();
        }
        if (residual == null) {
            Preconditions.checkState((!cont.shouldResume() ? 1 : 0) != 0);
            return new SplittableProcessElementInvoker.Result(null, cont, BoundedWindow.TIMESTAMP_MAX_VALUE);
        }
        return new SplittableProcessElementInvoker.Result(residual.getKey(), cont, (Instant)residual.getValue());
    }

    private class ProcessContext
    extends DoFn.ProcessContext
    implements RestrictionTrackers.ClaimObserver<PositionT> {
        private final WindowedValue<InputT> element;
        private final RestrictionTracker<RestrictionT, PositionT> tracker;
        private int numClaimedBlocks;
        private boolean hasClaimFailed;
        private int numOutputs;
        @Nullable
        private RestrictionT checkpoint;
        @Nullable
        private Instant residualWatermark;
        @Nullable
        private Future<?> scheduledCheckpoint;
        @Nullable
        private Instant lastReportedWatermark;

        public ProcessContext(WindowedValue<InputT> element, RestrictionTracker<RestrictionT, PositionT> tracker) {
            super(OutputAndTimeBoundedSplittableProcessElementInvoker.this.fn);
            this.element = element;
            this.tracker = RestrictionTrackers.observe(tracker, this);
        }

        @Override
        public void onClaimed(PositionT position) {
            Preconditions.checkState((!this.hasClaimFailed ? 1 : 0) != 0, (Object)"Must not call tryClaim() after it has previously returned false");
            if (this.numClaimedBlocks == 0) {
                this.scheduledCheckpoint = OutputAndTimeBoundedSplittableProcessElementInvoker.this.executor.schedule(this::takeCheckpointNow, OutputAndTimeBoundedSplittableProcessElementInvoker.this.maxDuration.getMillis(), TimeUnit.MILLISECONDS);
            }
            ++this.numClaimedBlocks;
        }

        @Override
        public void onClaimFailed(PositionT position) {
            Preconditions.checkState((!this.hasClaimFailed ? 1 : 0) != 0, (Object)"Must not call tryClaim() after it has previously returned false");
            this.hasClaimFailed = true;
        }

        void cancelScheduledCheckpoint() {
            if (this.scheduledCheckpoint == null) {
                return;
            }
            this.scheduledCheckpoint.cancel(true);
            try {
                Futures.getUnchecked(this.scheduledCheckpoint);
            }
            catch (CancellationException cancellationException) {
                // empty catch block
            }
        }

        synchronized KV<RestrictionT, Instant> takeCheckpointNow() {
            if (this.checkpoint == null) {
                this.residualWatermark = this.lastReportedWatermark;
                SplitResult split = this.tracker.trySplit(0.0);
                if (split != null) {
                    this.checkpoint = Preconditions.checkNotNull((Object)split.getResidual());
                }
            }
            return this.getTakenCheckpoint();
        }

        @Nullable
        synchronized KV<RestrictionT, Instant> getTakenCheckpoint() {
            return this.checkpoint == null ? null : KV.of(this.checkpoint, (Object)this.residualWatermark);
        }

        public InputT element() {
            return this.element.getValue();
        }

        public <T> T sideInput(PCollectionView<T> view) {
            return OutputAndTimeBoundedSplittableProcessElementInvoker.this.sideInputReader.get(view, view.getWindowMappingFn().getSideInputWindow((BoundedWindow)Iterables.getOnlyElement((Iterable)this.element.getWindows())));
        }

        public Instant timestamp() {
            return this.element.getTimestamp();
        }

        public PaneInfo pane() {
            return this.element.getPane();
        }

        public synchronized void updateWatermark(Instant watermark) {
            this.lastReportedWatermark = watermark;
        }

        synchronized Instant getLastReportedWatermark() {
            return this.lastReportedWatermark;
        }

        public PipelineOptions getPipelineOptions() {
            return OutputAndTimeBoundedSplittableProcessElementInvoker.this.pipelineOptions;
        }

        public void output(OutputT output) {
            this.outputWithTimestamp(output, this.element.getTimestamp());
        }

        public void outputWithTimestamp(OutputT value, Instant timestamp) {
            this.noteOutput();
            OutputAndTimeBoundedSplittableProcessElementInvoker.this.output.outputWindowedValue(value, timestamp, this.element.getWindows(), this.element.getPane());
        }

        public <T> void output(TupleTag<T> tag, T value) {
            this.outputWithTimestamp(tag, value, this.element.getTimestamp());
        }

        public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
            this.noteOutput();
            OutputAndTimeBoundedSplittableProcessElementInvoker.this.output.outputWindowedValue(tag, value, timestamp, this.element.getWindows(), this.element.getPane());
        }

        private void noteOutput() {
            Preconditions.checkState((!this.hasClaimFailed ? 1 : 0) != 0, (Object)"Output is not allowed after a failed tryClaim()");
            Preconditions.checkState((this.numClaimedBlocks > 0 ? 1 : 0) != 0, (Object)"Output is not allowed before tryClaim()");
            ++this.numOutputs;
            if (this.numOutputs >= OutputAndTimeBoundedSplittableProcessElementInvoker.this.maxNumOutputs) {
                this.takeCheckpointNow();
            }
        }
    }
}

