/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.server;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.netty.impl.ClientConnection;
import io.pravega.client.netty.impl.ConnectionFactory;
import io.pravega.common.Exceptions;
import io.pravega.common.util.ResourcePool;
import io.pravega.controller.server.WireCommandFailedException;
import io.pravega.controller.util.Config;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.PravegaNodeUri;
import io.pravega.shared.protocol.netty.ReplyProcessor;
import io.pravega.shared.protocol.netty.WireCommand;
import io.pravega.shared.protocol.netty.WireCommands;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.annotation.ParametersAreNonnullByDefault;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SegmentStoreConnectionManager
implements AutoCloseable {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(SegmentStoreConnectionManager.class);
    private static final int MAX_CONCURRENT_CONNECTIONS = 500;
    private static final int MAX_IDLE_CONNECTIONS = 100;
    private final LoadingCache<PravegaNodeUri, SegmentStoreConnectionPool> cache;

    SegmentStoreConnectionManager(final ConnectionFactory clientCF) {
        this.cache = CacheBuilder.newBuilder().maximumSize((long)Config.HOST_STORE_CONTAINER_COUNT).expireAfterAccess(5L, TimeUnit.MINUTES).removalListener(removalNotification -> ((SegmentStoreConnectionPool)((Object)((Object)removalNotification.getValue()))).shutdown()).build((CacheLoader)new CacheLoader<PravegaNodeUri, SegmentStoreConnectionPool>(){

            @ParametersAreNonnullByDefault
            public SegmentStoreConnectionPool load(PravegaNodeUri nodeUri) {
                return new SegmentStoreConnectionPool(nodeUri, clientCF);
            }
        });
    }

    CompletableFuture<ConnectionWrapper> getConnection(PravegaNodeUri uri, ReplyProcessor replyProcessor) {
        return ((SegmentStoreConnectionPool)((Object)this.cache.getUnchecked((Object)uri))).getConnection(replyProcessor);
    }

    @Override
    public void close() {
        this.cache.invalidateAll();
        this.cache.cleanUp();
    }

    @VisibleForTesting
    static class ReusableReplyProcessor
    implements ReplyProcessor {
        private final AtomicReference<ReplyProcessor> replyProcessor = new AtomicReference();

        ReusableReplyProcessor() {
        }

        void initialize(ReplyProcessor replyProcessor) {
            this.replyProcessor.set(replyProcessor);
        }

        void uninitialize() {
            this.replyProcessor.set(null);
        }

        private <T> void execute(BiConsumer<ReplyProcessor, T> toInvoke, T arg) {
            ReplyProcessor rp = this.replyProcessor.get();
            if (rp != null) {
                toInvoke.accept(rp, (ReplyProcessor)arg);
            }
        }

        private void execute(Consumer<ReplyProcessor> toInvoke) {
            ReplyProcessor rp = this.replyProcessor.get();
            if (rp != null) {
                toInvoke.accept(rp);
            }
        }

        public void hello(WireCommands.Hello hello) {
            this.execute(ReplyProcessor::hello, hello);
        }

        public void wrongHost(WireCommands.WrongHost wrongHost) {
            this.execute(ReplyProcessor::wrongHost, wrongHost);
        }

        public void segmentAlreadyExists(WireCommands.SegmentAlreadyExists segmentAlreadyExists) {
            this.execute(ReplyProcessor::segmentAlreadyExists, segmentAlreadyExists);
        }

        public void segmentIsSealed(WireCommands.SegmentIsSealed segmentIsSealed) {
            this.execute(ReplyProcessor::segmentIsSealed, segmentIsSealed);
        }

        public void segmentIsTruncated(WireCommands.SegmentIsTruncated segmentIsTruncated) {
            this.execute(ReplyProcessor::segmentIsTruncated, segmentIsTruncated);
        }

        public void noSuchSegment(WireCommands.NoSuchSegment noSuchSegment) {
            this.execute(ReplyProcessor::noSuchSegment, noSuchSegment);
        }

        public void tableSegmentNotEmpty(WireCommands.TableSegmentNotEmpty tableSegmentNotEmpty) {
            this.execute(ReplyProcessor::tableSegmentNotEmpty, tableSegmentNotEmpty);
        }

        public void invalidEventNumber(WireCommands.InvalidEventNumber invalidEventNumber) {
            this.execute(ReplyProcessor::invalidEventNumber, invalidEventNumber);
        }

        public void appendSetup(WireCommands.AppendSetup appendSetup) {
            this.execute(ReplyProcessor::appendSetup, appendSetup);
        }

        public void dataAppended(WireCommands.DataAppended dataAppended) {
            this.execute(ReplyProcessor::dataAppended, dataAppended);
        }

        public void conditionalCheckFailed(WireCommands.ConditionalCheckFailed dataNotAppended) {
            this.execute(ReplyProcessor::conditionalCheckFailed, dataNotAppended);
        }

        public void segmentRead(WireCommands.SegmentRead segmentRead) {
            this.execute(ReplyProcessor::segmentRead, segmentRead);
        }

        public void segmentAttributeUpdated(WireCommands.SegmentAttributeUpdated segmentAttributeUpdated) {
            this.execute(ReplyProcessor::segmentAttributeUpdated, segmentAttributeUpdated);
        }

        public void segmentAttribute(WireCommands.SegmentAttribute segmentAttribute) {
            this.execute(ReplyProcessor::segmentAttribute, segmentAttribute);
        }

        public void streamSegmentInfo(WireCommands.StreamSegmentInfo streamInfo) {
            this.execute(ReplyProcessor::streamSegmentInfo, streamInfo);
        }

        public void segmentCreated(WireCommands.SegmentCreated segmentCreated) {
            this.execute(ReplyProcessor::segmentCreated, segmentCreated);
        }

        public void segmentsMerged(WireCommands.SegmentsMerged segmentsMerged) {
            this.execute(ReplyProcessor::segmentsMerged, segmentsMerged);
        }

        public void segmentSealed(WireCommands.SegmentSealed segmentSealed) {
            this.execute(ReplyProcessor::segmentSealed, segmentSealed);
        }

        public void segmentTruncated(WireCommands.SegmentTruncated segmentTruncated) {
            this.execute(ReplyProcessor::segmentTruncated, segmentTruncated);
        }

        public void segmentDeleted(WireCommands.SegmentDeleted segmentDeleted) {
            this.execute(ReplyProcessor::segmentDeleted, segmentDeleted);
        }

        public void operationUnsupported(WireCommands.OperationUnsupported operationUnsupported) {
            this.execute(ReplyProcessor::operationUnsupported, operationUnsupported);
        }

        public void keepAlive(WireCommands.KeepAlive keepAlive) {
            this.execute(ReplyProcessor::keepAlive, keepAlive);
        }

        public void connectionDropped() {
            this.execute(ReplyProcessor::connectionDropped);
        }

        public void segmentPolicyUpdated(WireCommands.SegmentPolicyUpdated segmentPolicyUpdated) {
            this.execute(ReplyProcessor::segmentPolicyUpdated, segmentPolicyUpdated);
        }

        public void processingFailure(Exception error) {
            this.execute(ReplyProcessor::processingFailure, error);
        }

        public void authTokenCheckFailed(WireCommands.AuthTokenCheckFailed authTokenCheckFailed) {
            this.execute(ReplyProcessor::authTokenCheckFailed, authTokenCheckFailed);
        }

        public void tableEntriesUpdated(WireCommands.TableEntriesUpdated tableEntriesUpdated) {
            this.execute(ReplyProcessor::tableEntriesUpdated, tableEntriesUpdated);
        }

        public void tableKeysRemoved(WireCommands.TableKeysRemoved tableKeysRemoved) {
            this.execute(ReplyProcessor::tableKeysRemoved, tableKeysRemoved);
        }

        public void tableRead(WireCommands.TableRead tableRead) {
            this.execute(ReplyProcessor::tableRead, tableRead);
        }

        public void tableKeyDoesNotExist(WireCommands.TableKeyDoesNotExist tableKeyDoesNotExist) {
            this.execute(ReplyProcessor::tableKeyDoesNotExist, tableKeyDoesNotExist);
        }

        public void tableKeyBadVersion(WireCommands.TableKeyBadVersion tableKeyBadVersion) {
            this.execute(ReplyProcessor::tableKeyBadVersion, tableKeyBadVersion);
        }

        public void tableKeysRead(WireCommands.TableKeysRead tableKeysRead) {
            this.execute(ReplyProcessor::tableKeysRead, tableKeysRead);
        }

        public void tableEntriesRead(WireCommands.TableEntriesRead tableEntriesRead) {
            this.execute(ReplyProcessor::tableEntriesRead, tableEntriesRead);
        }
    }

    private static class ConnectionObject {
        private final ClientConnection connection;
        private final ReusableReplyProcessor reusableReplyProcessor;
        private final AtomicReference<ConnectionState> state;

        ConnectionObject(ClientConnection connection, ReusableReplyProcessor processor) {
            this.connection = connection;
            this.reusableReplyProcessor = processor;
            this.state = new AtomicReference<ConnectionState>(ConnectionState.CONNECTED);
        }

        private void failConnection() {
            this.state.set(ConnectionState.DISCONNECTED);
        }

        private <T> void sendAsync(WireCommand request, CompletableFuture<T> resultFuture) {
            this.connection.sendAsync(request, cfe -> {
                if (cfe != null) {
                    Throwable cause = Exceptions.unwrap((Throwable)cfe);
                    if (cause instanceof ConnectionFailedException) {
                        resultFuture.completeExceptionally(new WireCommandFailedException(cause, request.getType(), WireCommandFailedException.Reason.ConnectionFailed));
                        this.state.set(ConnectionState.DISCONNECTED);
                    } else {
                        log.debug("connection.sendAsync failed with {}", cause.getClass());
                        resultFuture.completeExceptionally(cause);
                    }
                }
            });
        }

        private static enum ConnectionState {
            CONNECTED,
            DISCONNECTED;

        }
    }

    static class ConnectionWrapper
    implements AutoCloseable {
        private final ResourcePool.CloseableResource<ConnectionObject> resource;
        private AtomicBoolean isClosed;

        private ConnectionWrapper(ResourcePool.CloseableResource<ConnectionObject> resource) {
            this.resource = resource;
            this.isClosed = new AtomicBoolean(false);
        }

        void failConnection() {
            ((ConnectionObject)this.resource.getResource()).failConnection();
        }

        <T> void sendAsync(WireCommand request, CompletableFuture<T> resultFuture) {
            ((ConnectionObject)this.resource.getResource()).sendAsync(request, resultFuture);
        }

        @VisibleForTesting
        ConnectionObject.ConnectionState getState() {
            return (ConnectionObject.ConnectionState)((Object)((ConnectionObject)this.resource.getResource()).state.get());
        }

        @VisibleForTesting
        ClientConnection getConnection() {
            return ((ConnectionObject)this.resource.getResource()).connection;
        }

        @VisibleForTesting
        ReplyProcessor getReplyProcessor() {
            return (ReplyProcessor)((ConnectionObject)this.resource.getResource()).reusableReplyProcessor.replyProcessor.get();
        }

        @Override
        public void close() {
            if (this.isClosed.compareAndSet(false, true)) {
                ConnectionObject connectionObject = (ConnectionObject)this.resource.getResource();
                connectionObject.reusableReplyProcessor.uninitialize();
                if (!((ConnectionObject.ConnectionState)((Object)connectionObject.state.get())).equals((Object)ConnectionObject.ConnectionState.CONNECTED)) {
                    this.resource.invalidate();
                }
                this.resource.close();
            }
        }
    }

    static class SegmentStoreConnectionPool
    extends ResourcePool<ConnectionObject> {
        @VisibleForTesting
        SegmentStoreConnectionPool(PravegaNodeUri pravegaNodeUri, ConnectionFactory clientCF) {
            this(pravegaNodeUri, clientCF, 500, 100);
        }

        @VisibleForTesting
        SegmentStoreConnectionPool(PravegaNodeUri pravegaNodeUri, ConnectionFactory clientCF, int maxConcurrent, int maxIdle) {
            super(() -> {
                ReusableReplyProcessor rp = new ReusableReplyProcessor();
                return clientCF.establishConnection(pravegaNodeUri, (ReplyProcessor)rp).thenApply(connection -> new ConnectionObject((ClientConnection)connection, rp));
            }, connectionObj -> ((ConnectionObject)connectionObj).connection.close(), maxConcurrent, maxIdle);
        }

        CompletableFuture<ConnectionWrapper> getConnection(ReplyProcessor replyProcessor) {
            return this.getResource().thenApply(closeableResource -> {
                ConnectionObject connectionObject = (ConnectionObject)closeableResource.getResource();
                connectionObject.reusableReplyProcessor.initialize(replyProcessor);
                return new ConnectionWrapper((ResourcePool.CloseableResource)closeableResource);
            });
        }
    }
}

