/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.fn.harness.MapFnRunners;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;

public abstract class WindowMergingFnRunner<T, W extends BoundedWindow> {
    static final String URN = "beam:transform:merge_windows:v1";

    static <T, W extends BoundedWindow> ThrowingFunction<KV<T, Iterable<W>>, KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>>> createMapFunctionForPTransform(String ptransformId, RunnerApi.PTransform ptransform) throws IOException {
        RunnerApi.FunctionSpec payload = RunnerApi.FunctionSpec.parseFrom((ByteString)ptransform.getSpec().getPayload());
        WindowFn<?, ?> windowFn = WindowingStrategyTranslation.windowFnFromProto(payload);
        return WindowMergingFnRunner.create(windowFn)::mergeWindows;
    }

    static <T, W extends BoundedWindow> WindowMergingFnRunner<T, W> create(WindowFn<?, W> windowFn) {
        if (windowFn.isNonMerging()) {
            return new NonMergingWindowFnRunner();
        }
        return new MergingViaWindowFnRunner(windowFn);
    }

    abstract KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(KV<T, Iterable<W>> var1) throws Exception;

    private static class MergingViaWindowFnRunner<T, W extends BoundedWindow>
    extends WindowMergingFnRunner<T, W> {
        private final WindowFn<T, W> windowFn;
        private final WindowFn.MergeContext mergeContext;
        private Collection<W> currentWindows;
        private List<KV<W, Collection<W>>> mergedWindows;

        private MergingViaWindowFnRunner(WindowFn<T, W> windowFn) {
            this.windowFn = windowFn;
            this.mergedWindows = new ArrayList<KV<W, Collection<W>>>();
            this.currentWindows = new ArrayList<W>();
            this.mergeContext = new WindowFn.MergeContext(windowFn){

                public Collection<W> windows() {
                    return currentWindows;
                }

                public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
                    mergedWindows.add(KV.of(mergeResult, toBeMerged));
                }
            };
        }

        @Override
        KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(KV<T, Iterable<W>> windowsToMerge) throws Exception {
            this.currentWindows = Sets.newHashSet((Iterable)((Iterable)windowsToMerge.getValue()));
            this.windowFn.mergeWindows(this.mergeContext);
            for (KV<W, Collection<W>> mergedWindow : this.mergedWindows) {
                this.currentWindows.removeAll((Collection)mergedWindow.getValue());
            }
            KV result = KV.of((Object)windowsToMerge.getKey(), (Object)KV.of((Object)Sets.newHashSet(this.currentWindows), (Object)Lists.newArrayList(this.mergedWindows)));
            this.currentWindows.clear();
            this.mergedWindows.clear();
            return result;
        }
    }

    private static class NonMergingWindowFnRunner<T, W extends BoundedWindow>
    extends WindowMergingFnRunner<T, W> {
        private NonMergingWindowFnRunner() {
        }

        @Override
        KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(KV<T, Iterable<W>> windowsToMerge) {
            return KV.of((Object)windowsToMerge.getKey(), (Object)KV.of((Object)((Iterable)windowsToMerge.getValue()), Collections.emptyList()));
        }
    }

    @AutoService(value={PTransformRunnerFactory.Registrar.class})
    public static class Registrar
    implements PTransformRunnerFactory.Registrar {
        @Override
        public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
            return ImmutableMap.of((Object)WindowMergingFnRunner.URN, MapFnRunners.forValueMapFnFactory(WindowMergingFnRunner::createMapFunctionForPTransform));
        }
    }
}

