/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.core;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import org.apache.beam.runners.core.MergingStateAccessor;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.GroupingState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

public class StateMerging {
    public static <K, StateT extends State, W extends BoundedWindow> void clear(MergingStateAccessor<K, W> context, StateTag<StateT> address) {
        for (State state : context.accessInEachMergingWindow(address).values()) {
            state.clear();
        }
    }

    @SuppressFBWarnings(value={"RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"})
    public static <K, T, W extends BoundedWindow> void prefetchBags(MergingStateAccessor<K, W> context, StateTag<BagState<T>> address) {
        Map<W, BagState<T>> map = context.accessInEachMergingWindow(address);
        if (map.isEmpty()) {
            return;
        }
        BagState<T> result = context.access(address);
        for (BagState<T> source : map.values()) {
            if (source.equals(result)) continue;
            StateMerging.prefetchRead(source);
        }
    }

    public static <K, T, W extends BoundedWindow> void mergeBags(MergingStateAccessor<K, W> context, StateTag<BagState<T>> address) {
        StateMerging.mergeBags(context.accessInEachMergingWindow(address).values(), context.access(address));
    }

    public static <T, W extends BoundedWindow> void mergeBags(Collection<BagState<T>> sources, BagState<T> result) {
        if (sources.isEmpty()) {
            return;
        }
        ArrayList<BagState<T>> futures = new ArrayList<BagState<T>>(sources.size());
        for (BagState<T> bagState : sources) {
            if (bagState.equals(result)) continue;
            StateMerging.prefetchRead(bagState);
            futures.add(bagState);
        }
        if (futures.isEmpty()) {
            return;
        }
        for (ReadableState readableState : futures) {
            for (Object element : (Iterable)readableState.read()) {
                result.add(element);
            }
        }
        for (BagState<Object> bagState : sources) {
            if (bagState.equals(result)) continue;
            bagState.clear();
        }
    }

    public static <K, T, W extends BoundedWindow> void mergeSets(MergingStateAccessor<K, W> context, StateTag<SetState<T>> address) {
        StateMerging.mergeSets(context.accessInEachMergingWindow(address).values(), context.access(address));
    }

    public static <T, W extends BoundedWindow> void mergeSets(Collection<SetState<T>> sources, SetState<T> result) {
        if (sources.isEmpty()) {
            return;
        }
        ArrayList<SetState<T>> futures = new ArrayList<SetState<T>>(sources.size());
        for (SetState<T> setState : sources) {
            if (setState.equals(result)) continue;
            StateMerging.prefetchRead(setState);
            futures.add(setState);
        }
        if (futures.isEmpty()) {
            return;
        }
        for (ReadableState readableState : futures) {
            for (Object element : (Iterable)readableState.read()) {
                result.add(element);
            }
        }
        for (SetState<Object> setState : sources) {
            if (setState.equals(result)) continue;
            setState.clear();
        }
    }

    public static <K, StateT extends GroupingState<?, ?>, W extends BoundedWindow> void prefetchCombiningValues(MergingStateAccessor<K, W> context, StateTag<StateT> address) {
        for (GroupingState state : context.accessInEachMergingWindow(address).values()) {
            StateMerging.prefetchRead(state);
        }
    }

    public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues(MergingStateAccessor<K, W> context, StateTag<CombiningState<InputT, AccumT, OutputT>> address) {
        StateMerging.mergeCombiningValues(context.accessInEachMergingWindow(address).values(), context.access(address));
    }

    public static <InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues(Collection<CombiningState<InputT, AccumT, OutputT>> sources, CombiningState<InputT, AccumT, OutputT> result) {
        if (sources.isEmpty()) {
            return;
        }
        if (sources.size() == 1 && sources.contains(result)) {
            return;
        }
        ArrayList futures = new ArrayList(sources.size());
        for (CombiningState<InputT, AccumT, OutputT> combiningState : sources) {
            StateMerging.prefetchRead(combiningState);
        }
        ArrayList<AccumT> accumulators = new ArrayList<AccumT>(futures.size());
        for (CombiningState<InputT, AccumT, OutputT> combiningState : sources) {
            accumulators.add(combiningState.getAccum());
        }
        Object e = result.mergeAccumulators(accumulators);
        for (CombiningState<InputT, AccumT, OutputT> source : sources) {
            source.clear();
        }
        result.addAccum(e);
    }

    private static void prefetchRead(ReadableState<?> source) {
        source.readLater();
    }
}

