/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.Aggregators;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.GroupByKeyHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.expressions.Aggregator;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import scala.Function1;
import scala.collection.Iterator;

class CombineGloballyTranslatorBatch<@UnknownKeyFor InT, @UnknownKeyFor AccT, @UnknownKeyFor OutT>
extends TransformTranslator<PCollection<InT>, PCollection<OutT>, Combine.Globally<InT, OutT>> {
    CombineGloballyTranslatorBatch() {
    }

    @Override
    protected void translate(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Combine.Globally<InT, OutT> transform, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized TransformTranslator. @UnknownKeyFor @NonNull @Initialized Context cxt) {
        Dataset result;
        WindowingStrategy windowing = ((PCollection)cxt.getInput()).getWindowingStrategy();
        Combine.CombineFn combineFn = (Combine.CombineFn)transform.getFn();
        Coder inputCoder = ((PCollection)cxt.getInput()).getCoder();
        Coder outputCoder = ((PCollection)cxt.getOutput()).getCoder();
        Coder<AccT> accumCoder = this.accumulatorCoder(combineFn, inputCoder, cxt);
        Encoder outEnc = cxt.encoderOf(outputCoder);
        Encoder<AccT> accEnc = cxt.encoderOf(accumCoder);
        Encoder wvOutEnc = cxt.windowedEncoder(outEnc);
        Dataset dataset = cxt.getDataset((PCollection)cxt.getInput());
        if (GroupByKeyHelpers.eligibleForGlobalGroupBy(windowing, true)) {
            Aggregator agg = Aggregators.value(combineFn, v -> v, accEnc, outEnc);
            result = CombineGloballyTranslatorBatch.aggregate(dataset, agg, GroupByKeyHelpers.value(), CombineGloballyTranslatorBatch.windowedValue(), wvOutEnc);
        } else {
            Aggregator agg = Aggregators.windowedValue(combineFn, GroupByKeyHelpers.value(), windowing, cxt.windowEncoder(), accEnc, wvOutEnc);
            result = CombineGloballyTranslatorBatch.aggregate(dataset, agg, v -> v, ScalaInterop.fun1(out -> ScalaInterop.scalaIterator(out)), wvOutEnc);
        }
        cxt.putDataset((PCollection)cxt.getOutput(), result);
    }

    private static <InT, OutT, AggInT, BuffT, AggOutT> @UnknownKeyFor @NonNull @Initialized Dataset<@UnknownKeyFor @NonNull @Initialized WindowedValue<OutT>> aggregate(@UnknownKeyFor @NonNull @Initialized Dataset<@UnknownKeyFor @NonNull @Initialized WindowedValue<InT>> ds, @UnknownKeyFor @NonNull @Initialized Aggregator<AggInT, BuffT, AggOutT> agg, @UnknownKeyFor @NonNull @Initialized ScalaInterop.Fun1<@UnknownKeyFor @NonNull @Initialized WindowedValue<InT>, AggInT> valueFn, @UnknownKeyFor @NonNull @Initialized ScalaInterop.Fun1<AggOutT, @UnknownKeyFor @NonNull @Initialized Iterator<@UnknownKeyFor @NonNull @Initialized WindowedValue<OutT>>> finishFn, @UnknownKeyFor @NonNull @Initialized Encoder<@UnknownKeyFor @NonNull @Initialized WindowedValue<OutT>> enc) {
        ScalaInterop.Fun1 reduce = ScalaInterop.fun1(it -> Iterator.single((Object)it.map((Function1)valueFn).foldLeft(agg.zero(), (arg_0, arg_1) -> ((Aggregator)agg).reduce(arg_0, arg_1))));
        ScalaInterop.Fun1 merge = ScalaInterop.fun1(it -> (Iterator)finishFn.apply(agg.finish(it.hasNext() ? it.reduce((arg_0, arg_1) -> ((Aggregator)agg).merge(arg_0, arg_1)) : agg.zero())));
        return ds.mapPartitions(reduce, agg.bufferEncoder()).coalesce(1).mapPartitions(merge, enc);
    }

    private @UnknownKeyFor @NonNull @Initialized Coder<AccT> accumulatorCoder(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Combine.CombineFn<InT, AccT, OutT> fn, @UnknownKeyFor @NonNull @Initialized Coder<InT> valueCoder, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized TransformTranslator. @UnknownKeyFor @NonNull @Initialized Context cxt) {
        try {
            return fn.getAccumulatorCoder(((PCollection)cxt.getInput()).getPipeline().getCoderRegistry(), valueCoder);
        }
        catch (CannotProvideCoderException e) {
            throw new RuntimeException(e);
        }
    }

    private static <T> @UnknownKeyFor @NonNull @Initialized ScalaInterop.Fun1<T, @UnknownKeyFor @NonNull @Initialized Iterator<@UnknownKeyFor @NonNull @Initialized WindowedValue<T>>> windowedValue() {
        return v -> Iterator.single((Object)WindowedValue.valueInGlobalWindow((Object)v));
    }
}

