/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.state;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.CompletableFuture;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.StateFetchingIterators;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeParameter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;

public class OrderedListUserState<@UnknownKeyFor T> {
    private final @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient;
    private final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest requestTemplate;
    private final @UnknownKeyFor @NonNull @Initialized TimestampedValueCoder<T> timestampedValueCoder;
    private @UnknownKeyFor @NonNull @Initialized NavigableMap<@UnknownKeyFor @NonNull @Initialized Instant, @UnknownKeyFor @NonNull @Initialized Collection<T>> pendingAdds = Maps.newTreeMap();
    private @UnknownKeyFor @NonNull @Initialized TreeRangeSet<@UnknownKeyFor @NonNull @Initialized Instant> pendingRemoves = TreeRangeSet.create();
    private @UnknownKeyFor @NonNull @Initialized boolean isCleared = false;
    private @UnknownKeyFor @NonNull @Initialized boolean isClosed = false;

    public OrderedListUserState(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> cache, @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient, @UnknownKeyFor @NonNull @Initialized String instructionId, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateKey stateKey, @UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder) {
        Preconditions.checkArgument((boolean)stateKey.hasOrderedListUserState(), (String)"Expected OrderedListUserState StateKey but received %s.", (Object)stateKey);
        this.beamFnStateClient = beamFnStateClient;
        this.timestampedValueCoder = TimestampedValueCoder.of(valueCoder);
        this.requestTemplate = BeamFnApi.StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
    }

    public void add(@UnknownKeyFor @NonNull @Initialized TimestampedValue<T> value) {
        Preconditions.checkState((!this.isClosed ? 1 : 0) != 0, (String)"OrderedList user state is no longer usable because it is closed for %s", (Object)this.requestTemplate.getStateKey());
        Instant timestamp = value.getTimestamp();
        this.pendingAdds.putIfAbsent(timestamp, new ArrayList());
        ((Collection)this.pendingAdds.get(timestamp)).add(value.getValue());
    }

    public @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized TimestampedValue<T>> readRange(@UnknownKeyFor @NonNull @Initialized Instant minTimestamp, @UnknownKeyFor @NonNull @Initialized Instant limitTimestamp) {
        Preconditions.checkState((!this.isClosed ? 1 : 0) != 0, (String)"OrderedList user state is no longer usable because it is closed for %s", (Object)this.requestTemplate.getStateKey());
        ArrayList<PrefetchableIterable> pendingAddsInRange = new ArrayList<PrefetchableIterable>();
        for (Map.Entry kv : this.pendingAdds.subMap(minTimestamp, limitTimestamp).entrySet()) {
            pendingAddsInRange.add(PrefetchableIterables.limit((Iterable)Iterables.transform((Iterable)kv.getValue(), v -> TimestampedValue.of((Object)v, (Instant)((Instant)kv.getKey()))), (int)kv.getValue().size()));
        }
        Iterable valuesInRange = Iterables.concat(pendingAddsInRange);
        if (!this.isCleared) {
            BeamFnApi.StateRequest.Builder getRequestBuilder = this.requestTemplate.toBuilder();
            getRequestBuilder.getStateKeyBuilder().getOrderedListUserStateBuilder().getRangeBuilder().setStart(minTimestamp.getMillis()).setEnd(limitTimestamp.getMillis());
            StateFetchingIterators.CachingStateIterable<T> persistentValues = StateFetchingIterators.readAllAndDecodeStartingFrom(Caches.noop(), this.beamFnStateClient, getRequestBuilder.build(), this.timestampedValueCoder);
            TreeRangeSet pendingRemovesSnapshot = TreeRangeSet.create(this.pendingRemoves);
            Iterable persistentValuesAfterRemoval = Iterables.filter(persistentValues, v -> !pendingRemovesSnapshot.contains((Comparable)v.getTimestamp()));
            return Iterables.mergeSorted((Iterable)ImmutableList.of((Object)persistentValuesAfterRemoval, (Object)valuesInRange), Comparator.comparing(TimestampedValue::getTimestamp));
        }
        return valuesInRange;
    }

    public @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized TimestampedValue<T>> read() {
        Preconditions.checkState((!this.isClosed ? 1 : 0) != 0, (String)"OrderedList user state is no longer usable because it is closed for %s", (Object)this.requestTemplate.getStateKey());
        return this.readRange(Instant.ofEpochMilli((long)Long.MIN_VALUE), Instant.ofEpochMilli((long)Long.MAX_VALUE));
    }

    public void clearRange(@UnknownKeyFor @NonNull @Initialized Instant minTimestamp, @UnknownKeyFor @NonNull @Initialized Instant limitTimestamp) {
        Preconditions.checkState((!this.isClosed ? 1 : 0) != 0, (String)"OrderedList user state is no longer usable because it is closed for %s", (Object)this.requestTemplate.getStateKey());
        this.pendingAdds.subMap(minTimestamp, limitTimestamp).clear();
        if (!this.isCleared) {
            this.pendingRemoves.add(Range.range((Comparable)minTimestamp, (BoundType)BoundType.CLOSED, (Comparable)limitTimestamp, (BoundType)BoundType.OPEN));
        }
    }

    public void clear() {
        Preconditions.checkState((!this.isClosed ? 1 : 0) != 0, (String)"OrderedList user state is no longer usable because it is closed for %s", (Object)this.requestTemplate.getStateKey());
        this.isCleared = true;
        this.pendingRemoves = TreeRangeSet.create();
        this.pendingRemoves.add(Range.range((Comparable)Instant.ofEpochMilli((long)Long.MIN_VALUE), (BoundType)BoundType.CLOSED, (Comparable)Instant.ofEpochMilli((long)Long.MAX_VALUE), (BoundType)BoundType.OPEN));
        this.pendingAdds.clear();
    }

    public void asyncClose() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.isClosed = true;
        if (!this.pendingRemoves.isEmpty()) {
            for (Object r : this.pendingRemoves.asRanges()) {
                BeamFnApi.StateRequest.Builder builder = this.requestTemplate.toBuilder();
                builder.setClear(BeamFnApi.StateClearRequest.newBuilder().build());
                builder.getStateKeyBuilder().getOrderedListUserStateBuilder().getRangeBuilder().setStart(((Instant)r.lowerEndpoint()).getMillis()).setEnd(((Instant)r.upperEndpoint()).getMillis());
                CompletableFuture<BeamFnApi.StateResponse> response = this.beamFnStateClient.handle(builder);
                if (response.get().getError().isEmpty()) continue;
                throw new IllegalStateException(response.get().getError());
            }
            this.pendingRemoves.clear();
        }
        if (!this.pendingAdds.isEmpty()) {
            ByteStringOutputStream outStream = new ByteStringOutputStream();
            for (Map.Entry entry : this.pendingAdds.entrySet()) {
                for (Object v : (Collection)entry.getValue()) {
                    TimestampedValue tv = TimestampedValue.of(v, (Instant)((Instant)entry.getKey()));
                    try {
                        this.timestampedValueCoder.encode(tv, (OutputStream)outStream);
                    }
                    catch (IOException ex) {
                        throw new RuntimeException(ex);
                    }
                }
            }
            BeamFnApi.StateRequest.Builder stateRequest = this.requestTemplate.toBuilder();
            stateRequest.getAppendBuilder().setData(outStream.toByteString());
            CompletableFuture<BeamFnApi.StateResponse> completableFuture = this.beamFnStateClient.handle(stateRequest);
            if (!completableFuture.get().getError().isEmpty()) {
                throw new IllegalStateException(completableFuture.get().getError());
            }
            this.pendingAdds.clear();
        }
    }

    public static class TimestampedValueCoder<@UnknownKeyFor T>
    extends StructuredCoder<TimestampedValue<T>> {
        private final @UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder;
        private final @UnknownKeyFor @NonNull @Initialized KvCoder<@UnknownKeyFor @NonNull @Initialized Long, T> internalKvCoder;

        public static <T> @UnknownKeyFor @NonNull @Initialized TimestampedValueCoder<T> of(@UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder) {
            return new TimestampedValueCoder<T>(valueCoder);
        }

        public @UnknownKeyFor @NonNull @Initialized Object structuralValue(@UnknownKeyFor @NonNull @Initialized TimestampedValue<T> value) {
            Object structuralValue = this.valueCoder.structuralValue(value.getValue());
            return TimestampedValue.of((Object)structuralValue, (Instant)value.getTimestamp());
        }

        TimestampedValueCoder(@UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder) {
            this.valueCoder = (Coder)Preconditions.checkNotNull(valueCoder);
            this.internalKvCoder = KvCoder.of((Coder)VarLongCoder.of(), (Coder)LengthPrefixCoder.of(valueCoder));
        }

        public void encode(@UnknownKeyFor @NonNull @Initialized TimestampedValue<T> timestampedValue, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) throws @UnknownKeyFor @NonNull @Initialized IOException {
            this.internalKvCoder.encode(KV.of((Object)timestampedValue.getTimestamp().getMillis(), (Object)timestampedValue.getValue()), outStream);
        }

        public @UnknownKeyFor @NonNull @Initialized TimestampedValue<T> decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) throws @UnknownKeyFor @NonNull @Initialized IOException {
            KV kv = this.internalKvCoder.decode(inStream);
            return TimestampedValue.of((Object)kv.getValue(), (Instant)Instant.ofEpochMilli((long)((Long)kv.getKey())));
        }

        public void verifyDeterministic() throws // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized Coder.NonDeterministicException {
            TimestampedValueCoder.verifyDeterministic((Coder)this, (String)"TimestampedValueCoder requires a deterministic valueCoder", (Coder[])new Coder[]{this.valueCoder});
        }

        public /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized List<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> getCoderArguments() {
            return Arrays.asList(this.valueCoder);
        }

        public @UnknownKeyFor @NonNull @Initialized Coder<T> getValueCoder() {
            return this.valueCoder;
        }

        public @UnknownKeyFor @NonNull @Initialized TypeDescriptor<@UnknownKeyFor @NonNull @Initialized TimestampedValue<T>> getEncodedTypeDescriptor() {
            return new TypeDescriptor<TimestampedValue<T>>(){}.where(new TypeParameter<T>(){}, this.valueCoder.getEncodedTypeDescriptor());
        }

        public /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized List<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> getComponents() {
            return Collections.singletonList(this.valueCoder);
        }
    }
}

