/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import io.pravega.client.tables.impl.IteratorState;
import io.pravega.client.tables.impl.KeyVersion;
import io.pravega.client.tables.impl.KeyVersionImpl;
import io.pravega.client.tables.impl.TableEntry;
import io.pravega.client.tables.impl.TableEntryImpl;
import io.pravega.client.tables.impl.TableKey;
import io.pravega.client.tables.impl.TableKeyImpl;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.AsyncIterator;
import io.pravega.common.util.ContinuationTokenAsyncIterator;
import io.pravega.common.util.RetriesExhaustedException;
import io.pravega.controller.server.SegmentHelper;
import io.pravega.controller.server.WireCommandFailedException;
import io.pravega.controller.server.rpc.auth.GrpcAuthHelper;
import io.pravega.controller.store.host.HostStoreException;
import io.pravega.controller.store.stream.AbstractStreamMetadataStore;
import io.pravega.controller.store.stream.Cache;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.Version;
import io.pravega.controller.store.stream.VersionedMetadata;
import io.pravega.controller.util.RetryHelper;
import java.beans.ConstructorProperties;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.curator.shaded.com.google.common.base.Charsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PravegaTablesStoreHelper {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(PravegaTablesStoreHelper.class);
    private static final int NUM_OF_RETRIES = 15;
    private final SegmentHelper segmentHelper;
    private final ScheduledExecutorService executor;
    private final Cache cache;
    private final AtomicReference<String> authToken;
    private final GrpcAuthHelper authHelper;
    private final int numOfRetries;

    public PravegaTablesStoreHelper(SegmentHelper segmentHelper, GrpcAuthHelper authHelper, ScheduledExecutorService executor) {
        this(segmentHelper, authHelper, executor, 15);
    }

    @VisibleForTesting
    PravegaTablesStoreHelper(SegmentHelper segmentHelper, GrpcAuthHelper authHelper, ScheduledExecutorService executor, int numOfRetries) {
        this.segmentHelper = segmentHelper;
        this.executor = executor;
        this.cache = new Cache(x -> {
            TableCacheKey entryKey = (TableCacheKey)x;
            return this.getEntry(entryKey.getTable(), entryKey.getKey(), entryKey.fromBytesFunc).thenApply(v -> new VersionedMetadata(v.getObject(), v.getVersion()));
        });
        this.authHelper = authHelper;
        this.authToken = new AtomicReference<String>(authHelper.retrieveMasterToken());
        this.numOfRetries = numOfRetries;
    }

    public <T> CompletableFuture<VersionedMetadata<T>> getCachedData(String table, String key, Function<byte[], T> fromBytes) {
        return this.cache.getCachedData(new TableCacheKey<T>(table, key, fromBytes)).thenApply(this::getVersionedMetadata);
    }

    private <T> VersionedMetadata<T> getVersionedMetadata(VersionedMetadata v) {
        return new VersionedMetadata(v.getObject(), v.getVersion());
    }

    public void invalidateCache(String table, String key) {
        this.cache.invalidateCache(new TableCacheKey<Object>(table, key, x -> null));
    }

    public CompletableFuture<Void> createTable(String tableName) {
        log.debug("create table called for table: {}", (Object)tableName);
        return Futures.toVoid(this.withRetries(() -> this.segmentHelper.createTableSegment(tableName, this.authToken.get(), 0L), () -> String.format("create table: %s", tableName))).whenCompleteAsync((r, e) -> {
            if (e != null) {
                log.warn("create table {} threw exception", (Object)tableName, e);
            } else {
                log.debug("table {} created successfully", (Object)tableName);
            }
        }, (Executor)this.executor);
    }

    public CompletableFuture<Void> deleteTable(String tableName, boolean mustBeEmpty) {
        log.debug("delete table called for table: {}", (Object)tableName);
        return this.expectingDataNotFound(this.withRetries(() -> this.segmentHelper.deleteTableSegment(tableName, mustBeEmpty, this.authToken.get(), 0L), () -> String.format("delete table: %s", tableName)), null).thenAcceptAsync(v -> log.debug("table {} deleted successfully", (Object)tableName), (Executor)this.executor);
    }

    public CompletableFuture<Version> addNewEntry(String tableName, String key, @NonNull byte[] value) {
        if (value == null) {
            throw new NullPointerException("value is marked @NonNull but is null");
        }
        log.trace("addNewEntry called for : {} key : {}", (Object)tableName, (Object)key);
        List<TableEntryImpl> entries = Collections.singletonList(new TableEntryImpl((TableKey)new TableKeyImpl((Object)key.getBytes(Charsets.UTF_8), KeyVersion.NOT_EXISTS), (Object)value));
        Supplier<String> errorMessage = () -> String.format("addNewEntry: key: %s table: %s", key, tableName);
        return ((CompletableFuture)this.withRetries(() -> this.segmentHelper.updateTableEntries(tableName, entries, this.authToken.get(), 0L), errorMessage).exceptionally(e -> {
            Throwable unwrap = Exceptions.unwrap((Throwable)e);
            if (unwrap instanceof StoreException.WriteConflictException) {
                throw StoreException.create(StoreException.Type.DATA_EXISTS, (String)errorMessage.get());
            }
            log.debug("add new entry {} to {} threw exception {} {}", new Object[]{key, tableName, unwrap.getClass(), unwrap.getMessage()});
            throw new CompletionException((Throwable)e);
        })).thenApplyAsync(x -> {
            KeyVersion first = (KeyVersion)x.get(0);
            log.trace("entry for key {} added to table {} with version {}", new Object[]{key, tableName, first.getSegmentVersion()});
            return new Version.LongVersion(first.getSegmentVersion());
        }, (Executor)this.executor);
    }

    public CompletableFuture<Version> addNewEntryIfAbsent(String tableName, String key, @NonNull byte[] value) {
        if (value == null) {
            throw new NullPointerException("value is marked @NonNull but is null");
        }
        return this.expectingDataExists(this.addNewEntry(tableName, key, value), null);
    }

    public CompletableFuture<Void> addNewEntriesIfAbsent(String tableName, Map<String, byte[]> toAdd) {
        List entries = toAdd.entrySet().stream().map(x -> new TableEntryImpl((TableKey)new TableKeyImpl((Object)((String)x.getKey()).getBytes(Charsets.UTF_8), KeyVersion.NOT_EXISTS), x.getValue())).collect(Collectors.toList());
        Supplier<String> errorMessage = () -> String.format("addNewEntriesIfAbsent: table: %s", tableName);
        return this.expectingDataExists((CompletableFuture)this.withRetries(() -> this.segmentHelper.updateTableEntries(tableName, entries, this.authToken.get(), 0L), errorMessage).handle((r, e) -> {
            if (e != null) {
                Throwable unwrap = Exceptions.unwrap((Throwable)e);
                if (unwrap instanceof StoreException.WriteConflictException) {
                    throw StoreException.create(StoreException.Type.DATA_EXISTS, (String)errorMessage.get());
                }
                log.debug("add new entries to {} threw exception {} {}", new Object[]{tableName, unwrap.getClass(), unwrap.getMessage()});
                throw new CompletionException((Throwable)e);
            }
            log.trace("entries added to table {}", (Object)tableName);
            return null;
        }), null);
    }

    public CompletableFuture<Version> updateEntry(String tableName, String key, byte[] value, Version ver) {
        log.trace("updateEntry entry called for : {} key : {} version {}", new Object[]{tableName, key, ver.asLongVersion().getLongValue()});
        KeyVersionImpl version = new KeyVersionImpl(ver.asLongVersion().getLongValue());
        List<TableEntryImpl> entries = Collections.singletonList(new TableEntryImpl((TableKey)new TableKeyImpl((Object)key.getBytes(Charsets.UTF_8), (KeyVersion)version), (Object)value));
        return this.withRetries(() -> this.segmentHelper.updateTableEntries(tableName, entries, this.authToken.get(), 0L), () -> String.format("updateEntry: key: %s table: %s", key, tableName)).thenApplyAsync(x -> {
            KeyVersion first = (KeyVersion)x.get(0);
            log.trace("entry for key {} updated to table {} with new version {}", new Object[]{key, tableName, first.getSegmentVersion()});
            return new Version.LongVersion(first.getSegmentVersion());
        }, (Executor)this.executor);
    }

    public <T> CompletableFuture<VersionedMetadata<T>> getEntry(String tableName, String key, Function<byte[], T> fromBytes) {
        log.trace("get entry called for : {} key : {}", (Object)tableName, (Object)key);
        List<TableKeyImpl> keys = Collections.singletonList(new TableKeyImpl((Object)key.getBytes(Charsets.UTF_8), null));
        CompletableFuture result = new CompletableFuture();
        String message = "get entry: key: %s table: %s";
        ((CompletableFuture)this.withRetries(() -> this.segmentHelper.readTable(tableName, keys, this.authToken.get(), 0L), () -> String.format(message, key, tableName)).thenApplyAsync(x -> {
            TableEntry first = (TableEntry)x.get(0);
            if (first.getKey().getVersion().equals(KeyVersion.NOT_EXISTS)) {
                throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, String.format(message, key, tableName));
            }
            log.trace("returning entry for : {} key : {} with version {}", new Object[]{tableName, key, first.getKey().getVersion().getSegmentVersion()});
            Object deserialized = fromBytes.apply((byte[])first.getValue());
            return new VersionedMetadata(deserialized, new Version.LongVersion(first.getKey().getVersion().getSegmentVersion()));
        }, (Executor)this.executor)).whenCompleteAsync((r, e) -> {
            if (e != null) {
                result.completeExceptionally((Throwable)e);
            } else {
                result.complete((VersionedMetadata)r);
            }
        }, (Executor)this.executor);
        return result;
    }

    public CompletableFuture<Void> removeEntry(String tableName, String key) {
        return this.removeEntry(tableName, key, null);
    }

    public CompletableFuture<Void> removeEntry(String tableName, String key, Version ver) {
        log.trace("remove entry called for : {} key : {}", (Object)tableName, (Object)key);
        KeyVersionImpl version = ver == null ? null : new KeyVersionImpl(ver.asLongVersion().getLongValue());
        List<TableKeyImpl> keys = Collections.singletonList(new TableKeyImpl((Object)key.getBytes(Charsets.UTF_8), (KeyVersion)version));
        return this.expectingDataNotFound(this.withRetries(() -> this.segmentHelper.removeTableKeys(tableName, keys, this.authToken.get(), 0L), () -> String.format("remove entry: key: %s table: %s", key, tableName)), null).thenAcceptAsync(v -> log.trace("entry for key {} removed from table {}", (Object)key, (Object)tableName), (Executor)this.executor);
    }

    public CompletableFuture<Void> removeEntries(String tableName, Collection<String> keys) {
        log.trace("remove entry called for : {} keys : {}", (Object)tableName, keys);
        List listOfKeys = keys.stream().map(x -> new TableKeyImpl((Object)x.getBytes(Charsets.UTF_8), null)).collect(Collectors.toList());
        return this.expectingDataNotFound(this.withRetries(() -> this.segmentHelper.removeTableKeys(tableName, listOfKeys, this.authToken.get(), 0L), () -> String.format("remove entries: keys: %s table: %s", keys.toString(), tableName)), null).thenAcceptAsync(v -> log.trace("entry for keys {} removed from table {}", (Object)keys, (Object)tableName), (Executor)this.executor);
    }

    public CompletableFuture<Map.Entry<ByteBuf, List<String>>> getKeysPaginated(String tableName, ByteBuf continuationToken, int limit) {
        log.trace("get keys paginated called for : {}", (Object)tableName);
        return this.withRetries(() -> this.segmentHelper.readTableKeys(tableName, limit, IteratorState.fromBytes((ByteBuf)continuationToken), this.authToken.get(), 0L), () -> String.format("get keys paginated for table: %s", tableName)).thenApplyAsync(result -> {
            List items = result.getItems().stream().map(x -> new String((byte[])x.getKey(), Charsets.UTF_8)).collect(Collectors.toList());
            log.trace("get keys paginated on table {} returned items {}", (Object)tableName, items);
            return new AbstractMap.SimpleEntry(result.getState().toBytes(), items);
        }, (Executor)this.executor);
    }

    public <T> CompletableFuture<Map.Entry<ByteBuf, List<Map.Entry<String, VersionedMetadata<T>>>>> getEntriesPaginated(String tableName, ByteBuf continuationToken, int limit, Function<byte[], T> fromBytes) {
        log.trace("get entries paginated called for : {}", (Object)tableName);
        return this.withRetries(() -> this.segmentHelper.readTableEntries(tableName, limit, IteratorState.fromBytes((ByteBuf)continuationToken), this.authToken.get(), 0L), () -> String.format("get entries paginated for table: %s", tableName)).thenApplyAsync(result -> {
            List items = result.getItems().stream().map(x -> {
                String key = new String((byte[])x.getKey().getKey(), Charsets.UTF_8);
                Object deserialized = fromBytes.apply((byte[])x.getValue());
                VersionedMetadata value = new VersionedMetadata(deserialized, new Version.LongVersion(x.getKey().getVersion().getSegmentVersion()));
                return new AbstractMap.SimpleEntry(key, value);
            }).collect(Collectors.toList());
            log.trace("get keys paginated on table {} returned number of items {}", (Object)tableName, (Object)items.size());
            return new AbstractMap.SimpleEntry(result.getState().toBytes(), items);
        }, (Executor)this.executor);
    }

    public <K, V> CompletableFuture<Map<K, V>> getEntriesWithFilter(String table, Function<String, K> fromStringKey, Function<byte[], V> fromBytesValue, BiFunction<K, V, Boolean> filter, int limit) {
        ConcurrentHashMap result = new ConcurrentHashMap();
        AtomicBoolean canContinue = new AtomicBoolean(true);
        AtomicReference<ByteBuf> token = new AtomicReference<ByteBuf>(IteratorState.EMPTY.toBytes());
        return Futures.exceptionallyExpecting((CompletableFuture)Futures.loop(canContinue::get, () -> this.getEntriesPaginated(table, (ByteBuf)token.get(), limit, fromBytesValue).thenAccept(v -> {
            List pair = (List)v.getValue();
            for (Map.Entry val2 : pair) {
                Object value;
                Object key = fromStringKey.apply((String)val2.getKey());
                if (!((Boolean)filter.apply(key, value = ((VersionedMetadata)val2.getValue()).getObject())).booleanValue()) continue;
                result.put(key, value);
                if (result.size() != limit) continue;
                break;
            }
            canContinue.set(((List)v.getValue()).size() >= limit && result.size() < limit);
            ((ByteBuf)token.get()).release();
            if (canContinue.get()) {
                token.set((ByteBuf)v.getKey());
            }
        }), (Executor)this.executor).thenApply(x -> result), AbstractStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, Collections.emptyMap());
    }

    public AsyncIterator<String> getAllKeys(String tableName) {
        return new ContinuationTokenAsyncIterator(token -> this.getKeysPaginated(tableName, (ByteBuf)token, 1000).thenApplyAsync(result -> {
            token.release();
            return new AbstractMap.SimpleEntry(result.getKey(), result.getValue());
        }, (Executor)this.executor), (Object)IteratorState.EMPTY.toBytes());
    }

    public <T> AsyncIterator<Map.Entry<String, VersionedMetadata<T>>> getAllEntries(String tableName, Function<byte[], T> fromBytes) {
        return new ContinuationTokenAsyncIterator(token -> this.getEntriesPaginated(tableName, (ByteBuf)token, 1000, fromBytes).thenApplyAsync(result -> {
            token.release();
            return new AbstractMap.SimpleEntry(result.getKey(), result.getValue());
        }, (Executor)this.executor), (Object)IteratorState.EMPTY.toBytes());
    }

    <T> CompletableFuture<T> expectingDataNotFound(CompletableFuture<T> future, T toReturn) {
        return Futures.exceptionallyExpecting(future, e -> Exceptions.unwrap((Throwable)e) instanceof StoreException.DataNotFoundException, toReturn);
    }

    <T> CompletableFuture<T> expectingDataExists(CompletableFuture<T> future, T toReturn) {
        return Futures.exceptionallyExpecting(future, e -> Exceptions.unwrap((Throwable)e) instanceof StoreException.DataExistsException, toReturn);
    }

    private <T> Supplier<CompletableFuture<T>> exceptionalCallback(Supplier<CompletableFuture<T>> future, Supplier<String> errorMessageSupplier) {
        return () -> ((CompletableFuture)CompletableFuture.completedFuture(null).thenComposeAsync(arg_0 -> PravegaTablesStoreHelper.lambda$null$48((Supplier)future, arg_0), (Executor)this.executor)).exceptionally(arg_0 -> this.lambda$null$49((Supplier)errorMessageSupplier, arg_0));
    }

    private <T> CompletableFuture<T> withRetries(Supplier<CompletableFuture<T>> futureSupplier, Supplier<String> errorMessage) {
        return RetryHelper.withRetriesAsync(this.exceptionalCallback(futureSupplier, errorMessage), e -> {
            Throwable unwrap = Exceptions.unwrap((Throwable)e);
            return unwrap instanceof StoreException.StoreConnectionException;
        }, this.numOfRetries, this.executor).exceptionally(e -> {
            Throwable t = Exceptions.unwrap((Throwable)e);
            if (t instanceof RetriesExhaustedException) {
                throw new CompletionException(t.getCause());
            }
            throw new CompletionException(t);
        });
    }

    private /* synthetic */ Object lambda$null$49(Supplier errorMessageSupplier, Throwable t) {
        StoreException toThrow;
        String errorMessage = (String)errorMessageSupplier.get();
        Throwable cause = Exceptions.unwrap((Throwable)t);
        if (cause instanceof WireCommandFailedException) {
            WireCommandFailedException wcfe = (WireCommandFailedException)cause;
            switch (wcfe.getReason()) {
                case ConnectionDropped: 
                case ConnectionFailed: 
                case UnknownHost: {
                    toThrow = StoreException.create(StoreException.Type.CONNECTION_ERROR, wcfe, errorMessage);
                    break;
                }
                case PreconditionFailed: {
                    toThrow = StoreException.create(StoreException.Type.ILLEGAL_STATE, wcfe, errorMessage);
                    break;
                }
                case AuthFailed: {
                    this.authToken.set(this.authHelper.retrieveMasterToken());
                    toThrow = StoreException.create(StoreException.Type.CONNECTION_ERROR, wcfe, errorMessage);
                    break;
                }
                case SegmentDoesNotExist: {
                    toThrow = StoreException.create(StoreException.Type.DATA_NOT_FOUND, wcfe, errorMessage);
                    break;
                }
                case TableSegmentNotEmpty: {
                    toThrow = StoreException.create(StoreException.Type.DATA_CONTAINS_ELEMENTS, wcfe, errorMessage);
                    break;
                }
                case TableKeyDoesNotExist: {
                    toThrow = StoreException.create(StoreException.Type.DATA_NOT_FOUND, wcfe, errorMessage);
                    break;
                }
                case TableKeyBadVersion: {
                    toThrow = StoreException.create(StoreException.Type.WRITE_CONFLICT, wcfe, errorMessage);
                    break;
                }
                default: {
                    toThrow = StoreException.create(StoreException.Type.UNKNOWN, wcfe, errorMessage);
                    break;
                }
            }
        } else if (cause instanceof HostStoreException) {
            log.warn("Host Store exception {}", (Object)cause.getMessage());
            toThrow = StoreException.create(StoreException.Type.CONNECTION_ERROR, cause, errorMessage);
        } else {
            log.warn("exception of unknown type thrown {} ", (Object)errorMessage, (Object)cause);
            toThrow = StoreException.create(StoreException.Type.UNKNOWN, cause, errorMessage);
        }
        throw new CompletionException(toThrow);
    }

    private static /* synthetic */ CompletionStage lambda$null$48(Supplier future, Object v) {
        return (CompletableFuture)future.get();
    }

    private class TableCacheKey<T>
    implements Cache.CacheKey {
        private final String table;
        private final String key;
        private final Function<byte[], T> fromBytesFunc;

        @ConstructorProperties(value={"table", "key", "fromBytesFunc"})
        @SuppressFBWarnings(justification="generated code")
        public TableCacheKey(String table, String key, Function<byte[], T> fromBytesFunc) {
            this.table = table;
            this.key = key;
            this.fromBytesFunc = fromBytesFunc;
        }

        @SuppressFBWarnings(justification="generated code")
        public String getTable() {
            return this.table;
        }

        @SuppressFBWarnings(justification="generated code")
        public String getKey() {
            return this.key;
        }

        @SuppressFBWarnings(justification="generated code")
        public Function<byte[], T> getFromBytesFunc() {
            return this.fromBytesFunc;
        }

        @SuppressFBWarnings(justification="generated code")
        public String toString() {
            return "PravegaTablesStoreHelper.TableCacheKey(table=" + this.getTable() + ", key=" + this.getKey() + ", fromBytesFunc=" + this.getFromBytesFunc() + ")";
        }

        @SuppressFBWarnings(justification="generated code")
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof TableCacheKey)) {
                return false;
            }
            TableCacheKey other = (TableCacheKey)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$table = this.getTable();
            String other$table = other.getTable();
            if (this$table == null ? other$table != null : !this$table.equals(other$table)) {
                return false;
            }
            String this$key = this.getKey();
            String other$key = other.getKey();
            return !(this$key == null ? other$key != null : !this$key.equals(other$key));
        }

        @SuppressFBWarnings(justification="generated code")
        protected boolean canEqual(Object other) {
            return other instanceof TableCacheKey;
        }

        @SuppressFBWarnings(justification="generated code")
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $table = this.getTable();
            result = result * 59 + ($table == null ? 43 : $table.hashCode());
            String $key = this.getKey();
            result = result * 59 + ($key == null ? 43 : $key.hashCode());
            return result;
        }
    }
}

