/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.stateful;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.HashBasedTable;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.Table;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.StateContexts;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.joda.time.Instant;

class SparkStateInternals<K>
implements StateInternals {
    private final K key;
    private final Table<String, String, byte[]> stateTable;

    private SparkStateInternals(K key) {
        this.key = key;
        this.stateTable = HashBasedTable.create();
    }

    private SparkStateInternals(K key, Table<String, String, byte[]> stateTable) {
        this.key = key;
        this.stateTable = stateTable;
    }

    static <K> SparkStateInternals<K> forKey(K key) {
        return new SparkStateInternals<K>(key);
    }

    static <K> SparkStateInternals<K> forKeyAndState(K key, Table<String, String, byte[]> stateTable) {
        return new SparkStateInternals<K>(key, stateTable);
    }

    public Table<String, String, byte[]> getState() {
        return this.stateTable;
    }

    public K getKey() {
        return this.key;
    }

    public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
        return this.state(namespace, address, StateContexts.nullContext());
    }

    public <T extends State> T state(StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
        return (T)address.bind((StateTag.StateBinder)new SparkStateBinder(namespace, c));
    }

    private final class SparkBagState<T>
    extends AbstractState<List<T>>
    implements BagState<T> {
        private SparkBagState(StateNamespace namespace, StateTag<BagState<T>> address, Coder<T> coder) {
            super(namespace, address, (Coder)ListCoder.of(coder));
        }

        public SparkBagState<T> readLater() {
            return this;
        }

        public List<T> read() {
            ArrayList value = (ArrayList)super.readValue();
            if (value == null) {
                value = new ArrayList();
            }
            return value;
        }

        public void add(T input) {
            Iterable value = this.read();
            value.add(input);
            this.writeValue(value);
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public ReadableState<Boolean> readLater() {
                    return this;
                }

                public Boolean read() {
                    return SparkStateInternals.this.stateTable.get(SparkBagState.this.namespace.stringKey(), SparkBagState.this.address.getId()) == null;
                }
            };
        }
    }

    private class SparkCombiningState<K, InputT, AccumT, OutputT>
    extends AbstractState<AccumT>
    implements CombiningState<InputT, AccumT, OutputT> {
        private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;

        private SparkCombiningState(StateNamespace namespace, StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> coder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
            super(namespace, address, coder);
            this.combineFn = combineFn;
        }

        public SparkCombiningState<K, InputT, AccumT, OutputT> readLater() {
            return this;
        }

        public OutputT read() {
            return (OutputT)this.combineFn.extractOutput(this.getAccum());
        }

        public void add(InputT input) {
            AccumT accum = this.getAccum();
            this.combineFn.addInput(accum, input);
            this.writeValue(accum);
        }

        public AccumT getAccum() {
            Object accum = this.readValue();
            if (accum == null) {
                accum = this.combineFn.createAccumulator();
            }
            return (AccumT)accum;
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public ReadableState<Boolean> readLater() {
                    return this;
                }

                public Boolean read() {
                    return SparkStateInternals.this.stateTable.get(SparkCombiningState.this.namespace.stringKey(), SparkCombiningState.this.address.getId()) == null;
                }
            };
        }

        public void addAccum(AccumT accum) {
            accum = this.combineFn.mergeAccumulators(Arrays.asList(this.getAccum(), accum));
            this.writeValue(accum);
        }

        public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
            return (AccumT)this.combineFn.mergeAccumulators(accumulators);
        }
    }

    private class SparkWatermarkHoldState
    extends AbstractState<Instant>
    implements WatermarkHoldState {
        private final TimestampCombiner timestampCombiner;

        public SparkWatermarkHoldState(StateNamespace namespace, StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) {
            super(namespace, address, (Coder)InstantCoder.of());
            this.timestampCombiner = timestampCombiner;
        }

        public SparkWatermarkHoldState readLater() {
            return this;
        }

        public Instant read() {
            return (Instant)this.readValue();
        }

        public void add(Instant outputTime) {
            Instant combined = this.read();
            combined = combined == null ? outputTime : this.getTimestampCombiner().combine(new Instant[]{combined, outputTime});
            this.writeValue(combined);
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public ReadableState<Boolean> readLater() {
                    return this;
                }

                public Boolean read() {
                    return SparkStateInternals.this.stateTable.get(SparkWatermarkHoldState.this.namespace.stringKey(), SparkWatermarkHoldState.this.address.getId()) == null;
                }
            };
        }

        public TimestampCombiner getTimestampCombiner() {
            return this.timestampCombiner;
        }
    }

    private class SparkValueState<T>
    extends AbstractState<T>
    implements ValueState<T> {
        private SparkValueState(StateNamespace namespace, StateTag<ValueState<T>> address, Coder<T> coder) {
            super(namespace, address, coder);
        }

        public SparkValueState<T> readLater() {
            return this;
        }

        public T read() {
            return this.readValue();
        }

        public void write(T input) {
            this.writeValue(input);
        }
    }

    private class AbstractState<T> {
        final StateNamespace namespace;
        final StateTag<? extends State> address;
        final Coder<T> coder;

        private AbstractState(StateNamespace namespace, StateTag<? extends State> address, Coder<T> coder) {
            this.namespace = namespace;
            this.address = address;
            this.coder = coder;
        }

        T readValue() {
            byte[] buf = (byte[])SparkStateInternals.this.stateTable.get(this.namespace.stringKey(), this.address.getId());
            if (buf != null) {
                return CoderHelpers.fromByteArray(buf, this.coder);
            }
            return null;
        }

        void writeValue(T input) {
            SparkStateInternals.this.stateTable.put(this.namespace.stringKey(), this.address.getId(), CoderHelpers.toByteArray(input, this.coder));
        }

        public void clear() {
            SparkStateInternals.this.stateTable.remove(this.namespace.stringKey(), this.address.getId());
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            AbstractState that = (AbstractState)o;
            return this.namespace.equals(that.namespace) && this.address.equals(that.address);
        }

        public int hashCode() {
            int result = this.namespace.hashCode();
            result = 31 * result + this.address.hashCode();
            return result;
        }
    }

    private class SparkStateBinder
    implements StateTag.StateBinder {
        private final StateNamespace namespace;
        private final StateContext<?> c;

        private SparkStateBinder(StateNamespace namespace, StateContext<?> c) {
            this.namespace = namespace;
            this.c = c;
        }

        public <T> ValueState<T> bindValue(StateTag<ValueState<T>> address, Coder<T> coder) {
            return new SparkValueState(this.namespace, address, coder);
        }

        public <T> BagState<T> bindBag(StateTag<BagState<T>> address, Coder<T> elemCoder) {
            return new SparkBagState(this.namespace, address, elemCoder);
        }

        public <T> SetState<T> bindSet(StateTag<SetState<T>> spec, Coder<T> elemCoder) {
            throw new UnsupportedOperationException(String.format("%s is not supported", SetState.class.getSimpleName()));
        }

        public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(StateTag<MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
            throw new UnsupportedOperationException(String.format("%s is not supported", MapState.class.getSimpleName()));
        }

        public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
            return new SparkCombiningState(this.namespace, address, accumCoder, combineFn);
        }

        public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
            return new SparkCombiningState(this.namespace, address, accumCoder, CombineFnUtil.bindContext(combineFn, this.c));
        }

        public WatermarkHoldState bindWatermark(StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) {
            return new SparkWatermarkHoldState(this.namespace, address, timestampCombiner);
        }
    }
}

