/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.twister2.translators.functions;

import edu.iu.dsc.tws.api.tset.TSetContext;
import edu.iu.dsc.tws.api.tset.fn.FlatMapFunc;
import edu.iu.dsc.tws.api.tset.fn.RecordCollector;
import java.io.IOException;
import java.io.ObjectStreamException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Logger;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.UnsupportedSideInputReader;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.InvalidProtocolBufferException;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;

public class GroupByWindowFunction<@UnknownKeyFor K, @UnknownKeyFor V, @UnknownKeyFor W extends @UnknownKeyFor @NonNull @Initialized BoundedWindow>
implements FlatMapFunc<WindowedValue<KV<K, Iterable<V>>>, KV<K, Iterable<WindowedValue<V>>>> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = Logger.getLogger(GroupByWindowFunction.class.getName());
    private transient /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized WindowingStrategy<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, W> windowingStrategy;
    private @UnknownKeyFor @NonNull @Initialized SystemReduceFn<K, V, @UnknownKeyFor @NonNull @Initialized Iterable<V>, @UnknownKeyFor @NonNull @Initialized Iterable<V>, W> reduceFn;
    private transient @UnknownKeyFor @NonNull @Initialized boolean isInitialized = false;
    private transient // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized RunnerApi.MessageWithComponents windowStrategyProto;
    private @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] windowBytes;

    public GroupByWindowFunction() {
        this.isInitialized = false;
    }

    public GroupByWindowFunction(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized WindowingStrategy<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, W> windowingStrategy, @UnknownKeyFor @NonNull @Initialized SystemReduceFn<K, V, @UnknownKeyFor @NonNull @Initialized Iterable<V>, @UnknownKeyFor @NonNull @Initialized Iterable<V>, W> reduceFn, @UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
        this.windowingStrategy = windowingStrategy;
        SdkComponents components = SdkComponents.create();
        components.registerEnvironment(Environments.createOrGetDefaultEnvironment((PortablePipelineOptions)((PortablePipelineOptions)options.as(PortablePipelineOptions.class))));
        try {
            this.windowStrategyProto = WindowingStrategyTranslation.toMessageProto(windowingStrategy, (SdkComponents)components);
            this.windowBytes = this.windowStrategyProto.toByteArray();
        }
        catch (IOException e) {
            LOG.info(e.getMessage());
        }
        this.reduceFn = reduceFn;
    }

    public void flatMap(@UnknownKeyFor @NonNull @Initialized KV<K, @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<V>>> kIteratorKV, @UnknownKeyFor @NonNull @Initialized RecordCollector<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<K, @UnknownKeyFor @NonNull @Initialized Iterable<V>>>> collector) {
        try {
            Object key = kIteratorKV.getKey();
            Iterable values = (Iterable)kIteratorKV.getValue();
            InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
            timerInternals.advanceProcessingTime(Instant.now());
            timerInternals.advanceSynchronizedProcessingTime(Instant.now());
            InMemoryStateInternals stateInternals = InMemoryStateInternals.forKey((Object)key);
            GABWOutputWindowedValue outputter = new GABWOutputWindowedValue();
            ReduceFnRunner reduceFnRunner = new ReduceFnRunner(key, this.windowingStrategy, ExecutableTriggerStateMachine.create((TriggerStateMachine)TriggerStateMachines.stateMachineForTrigger((RunnerApi.Trigger)TriggerTranslation.toProto((Trigger)this.windowingStrategy.getTrigger()))), (StateInternals)stateInternals, (TimerInternals)timerInternals, outputter, (SideInputReader)new UnsupportedSideInputReader("GroupAlsoByWindow"), this.reduceFn, null);
            reduceFnRunner.processElements(values);
            timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
            timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
            timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
            this.fireEligibleTimers(timerInternals, reduceFnRunner);
            reduceFnRunner.persist();
            Iterator outputs = outputter.getOutputs().iterator();
            while (outputs.hasNext()) {
                collector.collect(outputs.next());
            }
        }
        catch (Exception e) {
            LOG.info(e.getMessage());
        }
    }

    private void fireEligibleTimers(@UnknownKeyFor @NonNull @Initialized InMemoryTimerInternals timerInternals, @UnknownKeyFor @NonNull @Initialized ReduceFnRunner<K, V, @UnknownKeyFor @NonNull @Initialized Iterable<V>, W> reduceFnRunner) throws @UnknownKeyFor @NonNull @Initialized Exception {
        ArrayList<TimerInternals.TimerData> timers = new ArrayList<TimerInternals.TimerData>();
        while (true) {
            TimerInternals.TimerData timer;
            if ((timer = timerInternals.removeNextEventTimer()) != null) {
                timers.add(timer);
                continue;
            }
            while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
                timers.add(timer);
            }
            while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
                timers.add(timer);
            }
            if (timers.isEmpty()) break;
            reduceFnRunner.onTimers(timers);
            timers.clear();
        }
    }

    public void prepare(@UnknownKeyFor @NonNull @Initialized TSetContext context) {
        this.initTransient();
    }

    private void initTransient() {
        if (this.isInitialized) {
            return;
        }
        SdkComponents components = SdkComponents.create();
        try {
            this.windowStrategyProto = RunnerApi.MessageWithComponents.parseFrom((byte[])this.windowBytes);
            this.windowingStrategy = WindowingStrategyTranslation.fromProto((RunnerApi.WindowingStrategy)this.windowStrategyProto.getWindowingStrategy(), (RehydratedComponents)RehydratedComponents.forComponents((RunnerApi.Components)components.toComponents()));
        }
        catch (InvalidProtocolBufferException e) {
            LOG.info(e.getMessage());
        }
        this.isInitialized = true;
    }

    protected @UnknownKeyFor @NonNull @Initialized Object readResolve() throws @UnknownKeyFor @NonNull @Initialized ObjectStreamException {
        return this;
    }

    private static class GABWOutputWindowedValue<@UnknownKeyFor K, @UnknownKeyFor V>
    implements OutputWindowedValue<KV<K, Iterable<V>>> {
        private final @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<K, @UnknownKeyFor @NonNull @Initialized Iterable<V>>>> outputs = new ArrayList<WindowedValue<KV<K, Iterable<V>>>>();

        private GABWOutputWindowedValue() {
        }

        public void outputWindowedValue(@UnknownKeyFor @NonNull @Initialized KV<K, @UnknownKeyFor @NonNull @Initialized Iterable<V>> output, @UnknownKeyFor @NonNull @Initialized Instant timestamp, @UnknownKeyFor @NonNull @Initialized Collection<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized BoundedWindow> windows, @UnknownKeyFor @NonNull @Initialized PaneInfo pane) {
            this.outputs.add(WindowedValue.of(output, (Instant)timestamp, windows, (PaneInfo)pane));
        }

        public <AdditionalOutputT> void outputWindowedValue(@UnknownKeyFor @NonNull @Initialized TupleTag<AdditionalOutputT> tag, AdditionalOutputT output, @UnknownKeyFor @NonNull @Initialized Instant timestamp, @UnknownKeyFor @NonNull @Initialized Collection<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized BoundedWindow> windows, @UnknownKeyFor @NonNull @Initialized PaneInfo pane) {
            throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs.");
        }

        @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<K, @UnknownKeyFor @NonNull @Initialized Iterable<V>>>> getOutputs() {
            return this.outputs;
        }
    }
}

