/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.connection.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientConfig;
import io.pravega.client.connection.impl.ClientConnection;
import io.pravega.client.connection.impl.ConnectionFactory;
import io.pravega.client.connection.impl.ConnectionPool;
import io.pravega.client.connection.impl.Flow;
import io.pravega.client.connection.impl.FlowHandler;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shared.metrics.ClientMetricUpdater;
import io.pravega.shared.metrics.MetricListener;
import io.pravega.shared.metrics.MetricNotifier;
import io.pravega.shared.protocol.netty.PravegaNodeUri;
import io.pravega.shared.protocol.netty.ReplyProcessor;
import java.beans.ConstructorProperties;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionPoolImpl
implements ConnectionPool {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ConnectionPoolImpl.class);
    private final Object lock = new Object();
    private final ClientConfig clientConfig;
    private final MetricNotifier metricNotifier;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    @GuardedBy(value="lock")
    private final Map<PravegaNodeUri, List<Connection>> connectionMap = new HashMap<PravegaNodeUri, List<Connection>>();
    private final ConnectionFactory connectionFactory;

    public ConnectionPoolImpl(ClientConfig clientConfig, ConnectionFactory connectionFactory) {
        this.clientConfig = clientConfig;
        this.connectionFactory = connectionFactory;
        MetricListener metricListener = clientConfig.getMetricListener();
        this.metricNotifier = metricListener == null ? MetricNotifier.NO_OP_METRIC_NOTIFIER : new ClientMetricUpdater(metricListener);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<ClientConnection> getClientConnection(Flow flow, PravegaNodeUri location, ReplyProcessor rp) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(flow, "Flow");
        Preconditions.checkNotNull(location, "Location");
        Preconditions.checkNotNull(rp, "ReplyProcessor");
        Object object = this.lock;
        synchronized (object) {
            Connection connection2;
            Exceptions.checkNotClosed(this.closed.get(), this);
            List connectionList = this.connectionMap.getOrDefault(location, new ArrayList());
            List prunedConnectionList = connectionList.stream().filter(connection -> !connection.getFlowHandler().isDone() || connection.isConnected()).collect(Collectors.toList());
            log.debug("List of connections to {} that can be used: {}", (Object)location, prunedConnectionList);
            Optional suggestedConnection = prunedConnectionList.stream().min(Comparator.naturalOrder());
            if (suggestedConnection.isPresent() && (prunedConnectionList.size() >= this.clientConfig.getMaxConnectionsPerSegmentStore() || ConnectionPoolImpl.isUnused((Connection)suggestedConnection.get()))) {
                log.info("Reusing connection: {}", suggestedConnection.get());
                connection2 = (Connection)suggestedConnection.get();
            } else {
                log.info("Creating a new connection to {}", (Object)location);
                CompletableFuture<FlowHandler> establishedFuture = this.establishConnection(location);
                connection2 = new Connection(location, establishedFuture);
                prunedConnectionList.add(connection2);
            }
            this.connectionMap.put(location, prunedConnectionList);
            return connection2.getFlowHandler().thenApply(flowHandler -> flowHandler.createFlow(flow, rp));
        }
    }

    @Override
    public CompletableFuture<ClientConnection> getClientConnection(PravegaNodeUri location, ReplyProcessor rp) {
        Preconditions.checkNotNull(location, "Location");
        Preconditions.checkNotNull(rp, "ReplyProcessor");
        Exceptions.checkNotClosed(this.closed.get(), this);
        CompletableFuture<FlowHandler> handler = this.establishConnection(location);
        Connection connection = new Connection(location, handler);
        return connection.getFlowHandler().thenApply(h2 -> h2.createConnectionWithFlowDisabled(rp));
    }

    private static boolean isUnused(Connection connection) {
        return Futures.isSuccessful(connection.getFlowHandler()) && connection.getFlowCount() == 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void pruneUnusedConnections() {
        Object object = this.lock;
        synchronized (object) {
            for (List<Connection> connections : this.connectionMap.values()) {
                Iterator<Connection> iterator = connections.iterator();
                while (iterator.hasNext()) {
                    Connection connection = iterator.next();
                    if (!ConnectionPoolImpl.isUnused(connection)) continue;
                    connection.getFlowHandler().join().close();
                    iterator.remove();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public List<Connection> getActiveChannels() {
        Object object = this.lock;
        synchronized (object) {
            ArrayList<Connection> result = new ArrayList<Connection>();
            for (List<Connection> connection : this.connectionMap.values()) {
                result.addAll(connection);
            }
            return result;
        }
    }

    private CompletableFuture<FlowHandler> establishConnection(PravegaNodeUri location) {
        return FlowHandler.openConnection(location, this.metricNotifier, this.connectionFactory);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        log.info("Shutting down connection pool");
        if (this.closed.compareAndSet(false, true)) {
            this.metricNotifier.close();
            this.connectionFactory.close();
            Object object = this.lock;
            synchronized (object) {
                for (List<Connection> connections : this.connectionMap.values()) {
                    for (Connection connection : connections) {
                        connection.close();
                    }
                }
                this.connectionMap.clear();
            }
        }
    }

    @Override
    public ScheduledExecutorService getInternalExecutor() {
        return this.connectionFactory.getInternalExecutor();
    }

    private class Connection
    implements Comparable<Connection>,
    AutoCloseable {
        private final PravegaNodeUri uri;
        private final CompletableFuture<FlowHandler> flowHandler;

        int getFlowCount() {
            return Futures.isSuccessful(this.flowHandler) ? this.flowHandler.join().getOpenFlowCount() : 0;
        }

        boolean isConnected() {
            if (!Futures.isSuccessful(this.flowHandler)) {
                return false;
            }
            return !this.flowHandler.join().isClosed();
        }

        @Override
        public int compareTo(Connection o) {
            int v1 = Futures.isSuccessful(this.getFlowHandler()) ? this.getFlowCount() : Integer.MAX_VALUE;
            int v2 = Futures.isSuccessful(o.getFlowHandler()) ? o.getFlowCount() : Integer.MAX_VALUE;
            return Integer.compare(v1, v2);
        }

        @Override
        public void close() {
            if (Futures.isSuccessful(this.flowHandler)) {
                this.flowHandler.join().close();
            }
        }

        @ConstructorProperties(value={"uri", "flowHandler"})
        @SuppressFBWarnings(justification="generated code")
        @Generated
        public Connection(PravegaNodeUri uri, CompletableFuture<FlowHandler> flowHandler) {
            this.uri = uri;
            this.flowHandler = flowHandler;
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public PravegaNodeUri getUri() {
            return this.uri;
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public CompletableFuture<FlowHandler> getFlowHandler() {
            return this.flowHandler;
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Connection)) {
                return false;
            }
            Connection other = (Connection)o;
            if (!other.canEqual(this)) {
                return false;
            }
            PravegaNodeUri this$uri = this.getUri();
            PravegaNodeUri other$uri = other.getUri();
            if (this$uri == null ? other$uri != null : !((Object)this$uri).equals(other$uri)) {
                return false;
            }
            CompletableFuture<FlowHandler> this$flowHandler = this.getFlowHandler();
            CompletableFuture<FlowHandler> other$flowHandler = other.getFlowHandler();
            return !(this$flowHandler == null ? other$flowHandler != null : !this$flowHandler.equals(other$flowHandler));
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        protected boolean canEqual(Object other) {
            return other instanceof Connection;
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            PravegaNodeUri $uri = this.getUri();
            result = result * 59 + ($uri == null ? 43 : ((Object)$uri).hashCode());
            CompletableFuture<FlowHandler> $flowHandler = this.getFlowHandler();
            result = result * 59 + ($flowHandler == null ? 43 : $flowHandler.hashCode());
            return result;
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public String toString() {
            return "ConnectionPoolImpl.Connection(uri=" + this.getUri() + ", flowHandler=" + this.getFlowHandler() + ")";
        }
    }
}

