/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.wrappers.streaming.state;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.OrderedListState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineContextFactory;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.checkerframework.checker.nullness.qual.Nullable;

public class FlinkBroadcastStateInternals<K>
implements StateInternals {
    private int indexInSubtaskGroup;
    private final OperatorStateBackend stateBackend;
    private Map<String, Map<String, ?>> stateForNonZeroOperator;
    private final SerializablePipelineOptions pipelineOptions;

    public FlinkBroadcastStateInternals(int indexInSubtaskGroup, OperatorStateBackend stateBackend, SerializablePipelineOptions pipelineOptions) {
        this.stateBackend = stateBackend;
        this.indexInSubtaskGroup = indexInSubtaskGroup;
        this.pipelineOptions = pipelineOptions;
        if (indexInSubtaskGroup != 0) {
            this.stateForNonZeroOperator = new HashMap();
        }
    }

    public @Nullable K getKey() {
        return null;
    }

    public <T extends State> T state(final StateNamespace namespace, StateTag<T> address, final StateContext<?> context) {
        return (T)address.bind(new StateTag.StateBinder(){

            public <T2> ValueState<T2> bindValue(StateTag<ValueState<T2>> address, Coder<T2> coder) {
                return new FlinkBroadcastValueState<T2>(FlinkBroadcastStateInternals.this.stateBackend, address, namespace, coder, FlinkBroadcastStateInternals.this.pipelineOptions.get());
            }

            public <T2> BagState<T2> bindBag(StateTag<BagState<T2>> address, Coder<T2> elemCoder) {
                return new FlinkBroadcastBagState<T2>(FlinkBroadcastStateInternals.this.stateBackend, address, namespace, elemCoder, FlinkBroadcastStateInternals.this.pipelineOptions.get());
            }

            public <T2> SetState<T2> bindSet(StateTag<SetState<T2>> address, Coder<T2> elemCoder) {
                throw new UnsupportedOperationException(String.format("%s is not supported", SetState.class.getSimpleName()));
            }

            public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(StateTag<MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
                throw new UnsupportedOperationException(String.format("%s is not supported", MapState.class.getSimpleName()));
            }

            public <ElemT> OrderedListState<ElemT> bindOrderedList(StateTag<OrderedListState<ElemT>> spec, Coder<ElemT> elemCoder) {
                throw new UnsupportedOperationException(String.format("%s is not supported", OrderedListState.class.getSimpleName()));
            }

            public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
                return new FlinkCombiningState<InputT, AccumT, OutputT>(FlinkBroadcastStateInternals.this.stateBackend, address, combineFn, namespace, accumCoder, FlinkBroadcastStateInternals.this.pipelineOptions.get());
            }

            public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
                return new FlinkCombiningStateWithContext(FlinkBroadcastStateInternals.this.stateBackend, address, combineFn, namespace, accumCoder, FlinkBroadcastStateInternals.this, CombineContextFactory.createFromStateContext((StateContext)context));
            }

            public WatermarkHoldState bindWatermark(StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) {
                throw new UnsupportedOperationException(String.format("%s is not supported", WatermarkHoldState.class.getSimpleName()));
            }
        });
    }

    private class FlinkCombiningStateWithContext<K2, InputT, AccumT, OutputT>
    extends AbstractBroadcastState<AccumT>
    implements CombiningState<InputT, AccumT, OutputT> {
        private final StateNamespace namespace;
        private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
        private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
        private final FlinkBroadcastStateInternals<K2> flinkStateInternals;
        private final CombineWithContext.Context context;

        FlinkCombiningStateWithContext(OperatorStateBackend flinkStateBackend, StateTag<CombiningState<InputT, AccumT, OutputT>> address, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, Coder<AccumT> accumCoder, FlinkBroadcastStateInternals<K2> flinkStateInternals, CombineWithContext.Context context) {
            super(flinkStateBackend, address.getId(), namespace, accumCoder, FlinkBroadcastStateInternals.this.pipelineOptions.get());
            this.namespace = namespace;
            this.address = address;
            this.combineFn = combineFn;
            this.flinkStateInternals = flinkStateInternals;
            this.context = context;
        }

        public CombiningState<InputT, AccumT, OutputT> readLater() {
            return this;
        }

        public void add(InputT value) {
            try {
                Object current = this.readInternal();
                if (current == null) {
                    current = this.combineFn.createAccumulator(this.context);
                }
                current = this.combineFn.addInput(current, value, this.context);
                this.writeInternal(current);
            }
            catch (Exception e) {
                throw new RuntimeException("Error adding to state.", e);
            }
        }

        public void addAccum(AccumT accum) {
            try {
                Object current = this.readInternal();
                if (current == null) {
                    this.writeInternal(accum);
                } else {
                    current = this.combineFn.mergeAccumulators(Arrays.asList(current, accum), this.context);
                    this.writeInternal(current);
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Error adding to state.", e);
            }
        }

        public AccumT getAccum() {
            try {
                Object accum = this.readInternal();
                return (AccumT)(accum != null ? accum : this.combineFn.createAccumulator(this.context));
            }
            catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
            return (AccumT)this.combineFn.mergeAccumulators(accumulators, this.context);
        }

        public OutputT read() {
            try {
                Object accum = this.readInternal();
                if (accum == null) {
                    accum = this.combineFn.createAccumulator(this.context);
                }
                return (OutputT)this.combineFn.extractOutput(accum, this.context);
            }
            catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public Boolean read() {
                    try {
                        return FlinkCombiningStateWithContext.this.readInternal() == null;
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Error reading state.", e);
                    }
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        public void clear() {
            this.clearInternal();
        }

        public boolean equals(@Nullable Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FlinkCombiningStateWithContext that = (FlinkCombiningStateWithContext)o;
            return this.namespace.equals(that.namespace) && this.address.equals(that.address);
        }

        public int hashCode() {
            int result = this.namespace.hashCode();
            result = 31 * result + this.address.hashCode();
            return result;
        }
    }

    private class FlinkKeyedCombiningState<K2, InputT, AccumT, OutputT>
    extends AbstractBroadcastState<AccumT>
    implements CombiningState<InputT, AccumT, OutputT> {
        private final StateNamespace namespace;
        private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
        private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
        private final FlinkBroadcastStateInternals<K2> flinkStateInternals;

        FlinkKeyedCombiningState(OperatorStateBackend flinkStateBackend, StateTag<CombiningState<InputT, AccumT, OutputT>> address, Combine.CombineFn<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, Coder<AccumT> accumCoder, FlinkBroadcastStateInternals<K2> flinkStateInternals, PipelineOptions pipelineOptions) {
            super(flinkStateBackend, address.getId(), namespace, accumCoder, pipelineOptions);
            this.namespace = namespace;
            this.address = address;
            this.combineFn = combineFn;
            this.flinkStateInternals = flinkStateInternals;
        }

        public CombiningState<InputT, AccumT, OutputT> readLater() {
            return this;
        }

        public void add(InputT value) {
            try {
                Object current = this.readInternal();
                if (current == null) {
                    current = this.combineFn.createAccumulator();
                }
                current = this.combineFn.addInput(current, value);
                this.writeInternal(current);
            }
            catch (Exception e) {
                throw new RuntimeException("Error adding to state.", e);
            }
        }

        public void addAccum(AccumT accum) {
            try {
                Object current = this.readInternal();
                if (current == null) {
                    this.writeInternal(accum);
                } else {
                    current = this.combineFn.mergeAccumulators(Arrays.asList(current, accum));
                    this.writeInternal(current);
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Error adding to state.", e);
            }
        }

        public AccumT getAccum() {
            try {
                Object accum = this.readInternal();
                return (AccumT)(accum != null ? accum : this.combineFn.createAccumulator());
            }
            catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
            return (AccumT)this.combineFn.mergeAccumulators(accumulators);
        }

        public OutputT read() {
            try {
                Object accum = this.readInternal();
                if (accum != null) {
                    return (OutputT)this.combineFn.extractOutput(accum);
                }
                return (OutputT)this.combineFn.extractOutput(this.combineFn.createAccumulator());
            }
            catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public Boolean read() {
                    try {
                        return FlinkKeyedCombiningState.this.readInternal() == null;
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Error reading state.", e);
                    }
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        public void clear() {
            this.clearInternal();
        }

        public boolean equals(@Nullable Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FlinkKeyedCombiningState that = (FlinkKeyedCombiningState)o;
            return this.namespace.equals(that.namespace) && this.address.equals(that.address);
        }

        public int hashCode() {
            int result = this.namespace.hashCode();
            result = 31 * result + this.address.hashCode();
            return result;
        }
    }

    private class FlinkCombiningState<InputT, AccumT, OutputT>
    extends AbstractBroadcastState<AccumT>
    implements CombiningState<InputT, AccumT, OutputT> {
        private final StateNamespace namespace;
        private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
        private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;

        FlinkCombiningState(OperatorStateBackend flinkStateBackend, StateTag<CombiningState<InputT, AccumT, OutputT>> address, Combine.CombineFn<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, Coder<AccumT> accumCoder, PipelineOptions pipelineOptions) {
            super(flinkStateBackend, address.getId(), namespace, accumCoder, pipelineOptions);
            this.namespace = namespace;
            this.address = address;
            this.combineFn = combineFn;
        }

        public CombiningState<InputT, AccumT, OutputT> readLater() {
            return this;
        }

        public void add(InputT value) {
            Object current = this.readInternal();
            if (current == null) {
                current = this.combineFn.createAccumulator();
            }
            current = this.combineFn.addInput(current, value);
            this.writeInternal(current);
        }

        public void addAccum(AccumT accum) {
            Object current = this.readInternal();
            if (current == null) {
                this.writeInternal(accum);
            } else {
                current = this.combineFn.mergeAccumulators(Arrays.asList(current, accum));
                this.writeInternal(current);
            }
        }

        public AccumT getAccum() {
            Object accum = this.readInternal();
            return (AccumT)(accum != null ? accum : this.combineFn.createAccumulator());
        }

        public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
            return (AccumT)this.combineFn.mergeAccumulators(accumulators);
        }

        public OutputT read() {
            Object accum = this.readInternal();
            if (accum != null) {
                return (OutputT)this.combineFn.extractOutput(accum);
            }
            return (OutputT)this.combineFn.extractOutput(this.combineFn.createAccumulator());
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public Boolean read() {
                    try {
                        return FlinkCombiningState.this.readInternal() == null;
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Error reading state.", e);
                    }
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        public void clear() {
            this.clearInternal();
        }

        public boolean equals(@Nullable Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FlinkCombiningState that = (FlinkCombiningState)o;
            return this.namespace.equals(that.namespace) && this.address.equals(that.address);
        }

        public int hashCode() {
            int result = this.namespace.hashCode();
            result = 31 * result + this.address.hashCode();
            return result;
        }
    }

    private class FlinkBroadcastBagState<T>
    extends AbstractBroadcastState<List<T>>
    implements BagState<T> {
        private final StateNamespace namespace;
        private final StateTag<BagState<T>> address;

        FlinkBroadcastBagState(OperatorStateBackend flinkStateBackend, StateTag<BagState<T>> address, StateNamespace namespace, Coder<T> coder, PipelineOptions pipelineOptions) {
            super(flinkStateBackend, address.getId(), namespace, ListCoder.of(coder), pipelineOptions);
            this.namespace = namespace;
            this.address = address;
        }

        public void add(T input) {
            ArrayList<T> list = (ArrayList<T>)this.readInternal();
            if (list == null) {
                list = new ArrayList<T>();
            }
            list.add(input);
            this.writeInternal(list);
        }

        public BagState<T> readLater() {
            return this;
        }

        public Iterable<T> read() {
            List result = (List)this.readInternal();
            return result != null ? result : Collections.emptyList();
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public Boolean read() {
                    try {
                        List result = (List)FlinkBroadcastBagState.this.readInternal();
                        return result == null;
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Error reading state.", e);
                    }
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        public void clear() {
            this.clearInternal();
        }

        public boolean equals(@Nullable Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FlinkBroadcastBagState that = (FlinkBroadcastBagState)o;
            return this.namespace.equals(that.namespace) && this.address.equals(that.address);
        }

        public int hashCode() {
            int result = this.namespace.hashCode();
            result = 31 * result + this.address.hashCode();
            return result;
        }
    }

    private class FlinkBroadcastValueState<T>
    extends AbstractBroadcastState<T>
    implements ValueState<T> {
        private final StateNamespace namespace;
        private final StateTag<ValueState<T>> address;

        FlinkBroadcastValueState(OperatorStateBackend flinkStateBackend, StateTag<ValueState<T>> address, StateNamespace namespace, Coder<T> coder, PipelineOptions pipelineOptions) {
            super(flinkStateBackend, address.getId(), namespace, coder, pipelineOptions);
            this.namespace = namespace;
            this.address = address;
        }

        public void write(T input) {
            this.writeInternal(input);
        }

        public ValueState<T> readLater() {
            return this;
        }

        public T read() {
            return this.readInternal();
        }

        public boolean equals(@Nullable Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FlinkBroadcastValueState that = (FlinkBroadcastValueState)o;
            return this.namespace.equals(that.namespace) && this.address.equals(that.address);
        }

        public int hashCode() {
            int result = this.namespace.hashCode();
            result = 31 * result + this.address.hashCode();
            return result;
        }

        public void clear() {
            this.clearInternal();
        }
    }

    private abstract class AbstractBroadcastState<T> {
        private String name;
        private final StateNamespace namespace;
        private final ListStateDescriptor<Map<String, T>> flinkStateDescriptor;
        private final OperatorStateStore flinkStateBackend;

        AbstractBroadcastState(OperatorStateBackend flinkStateBackend, String name, StateNamespace namespace, Coder<T> coder, PipelineOptions pipelineOptions) {
            this.name = name;
            this.namespace = namespace;
            this.flinkStateBackend = flinkStateBackend;
            CoderTypeInformation typeInfo = new CoderTypeInformation(MapCoder.of((Coder)StringUtf8Coder.of(), coder), pipelineOptions);
            this.flinkStateDescriptor = new ListStateDescriptor(name, typeInfo.createSerializer(new ExecutionConfig()));
        }

        Map<String, T> getMap() throws Exception {
            if (FlinkBroadcastStateInternals.this.indexInSubtaskGroup == 0) {
                return this.getMapFromBroadcastState();
            }
            Map<String, T> result = (Map<String, T>)FlinkBroadcastStateInternals.this.stateForNonZeroOperator.get(this.name);
            if (result == null && (result = this.getMapFromBroadcastState()) != null) {
                FlinkBroadcastStateInternals.this.stateForNonZeroOperator.put(this.name, result);
                this.flinkStateBackend.getUnionListState(this.flinkStateDescriptor).clear();
            }
            return result;
        }

        Map<String, T> getMapFromBroadcastState() throws Exception {
            Iterator iterator;
            ListState state = this.flinkStateBackend.getUnionListState(this.flinkStateDescriptor);
            Iterable iterable = (Iterable)state.get();
            Map ret = null;
            if (iterable != null && (iterator = iterable.iterator()).hasNext()) {
                ret = (Map)iterator.next();
            }
            return ret;
        }

        void updateMap(Map<String, T> map) throws Exception {
            if (FlinkBroadcastStateInternals.this.indexInSubtaskGroup == 0) {
                ListState state = this.flinkStateBackend.getUnionListState(this.flinkStateDescriptor);
                state.clear();
                if (map.size() > 0) {
                    state.add(map);
                }
            } else if (map.isEmpty()) {
                FlinkBroadcastStateInternals.this.stateForNonZeroOperator.remove(this.name);
            } else {
                FlinkBroadcastStateInternals.this.stateForNonZeroOperator.put(this.name, map);
            }
        }

        void writeInternal(T input) {
            try {
                Map<String, T> map = this.getMap();
                if (map == null) {
                    map = new HashMap<String, T>();
                }
                map.put(this.namespace.stringKey(), input);
                this.updateMap(map);
            }
            catch (Exception e) {
                throw new RuntimeException("Error updating state.", e);
            }
        }

        T readInternal() {
            try {
                Map<String, T> map = this.getMap();
                if (map == null) {
                    return null;
                }
                return map.get(this.namespace.stringKey());
            }
            catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        void clearInternal() {
            try {
                Map<String, T> map = this.getMap();
                if (map != null) {
                    map.remove(this.namespace.stringKey());
                    this.updateMap(map);
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Error clearing state.", e);
            }
        }
    }
}

