/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.bolt.connection.pooled;

import java.net.URI;
import java.time.Clock;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.neo4j.bolt.connection.AuthInfo;
import org.neo4j.bolt.connection.AuthToken;
import org.neo4j.bolt.connection.BasicResponseHandler;
import org.neo4j.bolt.connection.BoltAgent;
import org.neo4j.bolt.connection.BoltConnection;
import org.neo4j.bolt.connection.BoltConnectionParameters;
import org.neo4j.bolt.connection.BoltConnectionProvider;
import org.neo4j.bolt.connection.BoltConnectionSource;
import org.neo4j.bolt.connection.BoltConnectionState;
import org.neo4j.bolt.connection.BoltProtocolVersion;
import org.neo4j.bolt.connection.BoltServerAddress;
import org.neo4j.bolt.connection.LoggingProvider;
import org.neo4j.bolt.connection.NotificationConfig;
import org.neo4j.bolt.connection.ResponseHandler;
import org.neo4j.bolt.connection.SecurityPlan;
import org.neo4j.bolt.connection.exception.BoltFailureException;
import org.neo4j.bolt.connection.exception.BoltTransientException;
import org.neo4j.bolt.connection.exception.MinVersionAcquisitionException;
import org.neo4j.bolt.connection.message.Message;
import org.neo4j.bolt.connection.message.Messages;
import org.neo4j.bolt.connection.observation.ImmutableObservation;
import org.neo4j.bolt.connection.observation.Observation;
import org.neo4j.bolt.connection.pooled.AuthTokenManager;
import org.neo4j.bolt.connection.pooled.SecurityPlanSupplier;
import org.neo4j.bolt.connection.pooled.impl.PooledBoltConnection;
import org.neo4j.bolt.connection.pooled.impl.util.FutureUtil;
import org.neo4j.bolt.connection.pooled.observation.PoolObservationProvider;

public class PooledBoltConnectionSource
implements BoltConnectionSource<BoltConnectionParameters> {
    private final System.Logger log;
    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
    private final BoltConnectionProvider boltConnectionProvider;
    private final List<ConnectionEntry> pooledConnectionEntries;
    private final Queue<CompletableFuture<PooledBoltConnection>> pendingAcquisitions;
    private final int maxSize;
    private final long acquisitionTimeout;
    private final long maxLifetime;
    private final long idleBeforeTest;
    private final Clock clock;
    private final PoolObservationProvider observationProvider;
    private final URI uri;
    private final BoltServerAddress address;
    private final String routingContextAddress;
    private final BoltAgent boltAgent;
    private final String userAgent;
    private final int connectTimeoutMillis;
    private final String poolId;
    private final AuthTokenManager authTokenManager;
    private final SecurityPlanSupplier securityPlanSupplier;
    private final NotificationConfig notificationConfig;
    private CompletionStage<Void> closeStage;
    private long minAuthTimestamp;

    public PooledBoltConnectionSource(LoggingProvider loggingProvider, Clock clock, URI uri, BoltConnectionProvider boltConnectionProvider, AuthTokenManager authTokenManager, SecurityPlanSupplier securityPlanSupplier, int maxSize, long acquisitionTimeout, long maxLifetime, long idleBeforeTest, PoolObservationProvider observationProvider, String routingContextAddress, BoltAgent boltAgent, String userAgent, int connectTimeoutMillis, NotificationConfig notificationConfig) {
        this.uri = Objects.requireNonNull(uri);
        this.address = switch (uri.getScheme()) {
            case "bolt", "bolt+s", "bolt+ssc", "neo4j", "neo4j+s", "neo4j+ssc" -> new BoltServerAddress(uri);
            default -> new BoltServerAddress(uri.getHost(), 0);
        };
        this.poolId = this.poolId(this.address);
        this.observationProvider = Objects.requireNonNull(observationProvider);
        this.maxSize = maxSize;
        Observation createObservation = observationProvider.connectionPoolCreate(this.poolId, uri, maxSize);
        try {
            this.boltConnectionProvider = Objects.requireNonNull(boltConnectionProvider);
            this.pooledConnectionEntries = new ArrayList<ConnectionEntry>();
            this.pendingAcquisitions = new ArrayDeque<CompletableFuture<PooledBoltConnection>>(100);
            this.acquisitionTimeout = acquisitionTimeout;
            this.maxLifetime = maxLifetime;
            this.idleBeforeTest = idleBeforeTest;
            this.clock = Objects.requireNonNull(clock);
            this.log = loggingProvider.getLog(this.getClass());
            this.routingContextAddress = routingContextAddress;
            this.boltAgent = Objects.requireNonNull(boltAgent);
            this.userAgent = Objects.requireNonNull(userAgent);
            this.connectTimeoutMillis = connectTimeoutMillis;
            this.authTokenManager = Objects.requireNonNull(authTokenManager);
            this.securityPlanSupplier = Objects.requireNonNull(securityPlanSupplier);
            this.notificationConfig = Objects.requireNonNull(notificationConfig);
        }
        catch (RuntimeException ex) {
            createObservation.error((Throwable)ex);
            throw ex;
        }
        finally {
            createObservation.stop();
        }
    }

    public CompletionStage<BoltConnection> getConnection() {
        return this.getConnection(BoltConnectionParameters.defaultParameters());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletionStage<BoltConnection> getConnection(BoltConnectionParameters parameters) {
        boolean overrideAuthToken;
        CompletionStage<AuthToken> authTokenSupplier;
        PooledBoltConnectionSource pooledBoltConnectionSource = this;
        synchronized (pooledBoltConnectionSource) {
            if (this.closeStage != null) {
                return CompletableFuture.failedFuture(new IllegalStateException("Connection source is closed."));
            }
        }
        ImmutableObservation parentObservation = this.observationProvider.scopedObservation();
        Observation acquireObservation = this.observationProvider.pooledConnectionAcquire(this.poolId, this.uri);
        CompletableFuture acquisitionFuture = new CompletableFuture();
        if (parameters.authToken() != null) {
            authTokenSupplier = CompletableFuture.completedStage(parameters.authToken());
            overrideAuthToken = true;
        } else {
            authTokenSupplier = this.authTokenManager.getToken();
            overrideAuthToken = false;
        }
        authTokenSupplier.whenComplete((authToken, authThrowable) -> {
            if (authThrowable != null) {
                acquisitionFuture.completeExceptionally((Throwable)authThrowable);
                return;
            }
            acquisitionFuture.whenComplete((connection, throwable) -> {
                if ((throwable = FutureUtil.completionExceptionCause(throwable)) != null) {
                    acquireObservation.error(throwable);
                }
                acquireObservation.stop();
            });
            this.connect(acquisitionFuture, (AuthToken)authToken, overrideAuthToken, parameters.minVersion(), this.notificationConfig, (ImmutableObservation)acquireObservation);
        });
        return acquisitionFuture.thenApply(boltConnection -> {
            boltConnection.onUsageStart(parentObservation);
            return boltConnection;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void connect(CompletableFuture<PooledBoltConnection> acquisitionFuture, AuthToken authToken, boolean overrideAuthToken, BoltProtocolVersion minVersion, NotificationConfig notificationConfig, ImmutableObservation parentObservation) {
        ConnectionEntryWithMetadata connectionEntryWithMetadata = null;
        BoltTransientException pendingAcquisitionsFull = null;
        AtomicBoolean empty = new AtomicBoolean();
        PooledBoltConnectionSource pooledBoltConnectionSource = this;
        synchronized (pooledBoltConnectionSource) {
            try {
                empty.set(this.pooledConnectionEntries.isEmpty());
                try {
                    connectionEntryWithMetadata = this.acquireExistingEntry(authToken, minVersion);
                }
                catch (MinVersionAcquisitionException e) {
                    acquisitionFuture.completeExceptionally(e);
                    return;
                }
                if (connectionEntryWithMetadata == null) {
                    if (this.pooledConnectionEntries.size() < this.maxSize) {
                        ConnectionEntry acquiredEntry = new ConnectionEntry();
                        this.pooledConnectionEntries.add(acquiredEntry);
                        connectionEntryWithMetadata = new ConnectionEntryWithMetadata(acquiredEntry, false);
                    } else if (this.pendingAcquisitions.size() < 100 && !acquisitionFuture.isDone()) {
                        if (this.acquisitionTimeout > 0L) {
                            this.pendingAcquisitions.add(acquisitionFuture);
                        }
                        this.executorService.schedule(() -> {
                            PooledBoltConnectionSource pooledBoltConnectionSource = this;
                            synchronized (pooledBoltConnectionSource) {
                                this.pendingAcquisitions.remove(acquisitionFuture);
                            }
                            try {
                                acquisitionFuture.completeExceptionally(new TimeoutException("Unable to acquire connection from the pool within configured maximum time of " + this.acquisitionTimeout + "ms"));
                            }
                            catch (Throwable throwable) {
                                this.log.log(System.Logger.Level.WARNING, "Unexpected error occurred.", throwable);
                            }
                        }, this.acquisitionTimeout, TimeUnit.MILLISECONDS);
                    } else {
                        pendingAcquisitionsFull = new BoltTransientException("Connection pool pending acquisition queue is full.");
                    }
                }
            }
            catch (Throwable throwable2) {
                if (connectionEntryWithMetadata != null) {
                    if (connectionEntryWithMetadata.connectionEntry.connection != null) {
                        connectionEntryWithMetadata.connectionEntry.available = true;
                    } else {
                        this.pooledConnectionEntries.remove(connectionEntryWithMetadata.connectionEntry);
                    }
                }
                this.pendingAcquisitions.remove(acquisitionFuture);
                acquisitionFuture.completeExceptionally(throwable2);
            }
        }
        if (pendingAcquisitionsFull != null) {
            acquisitionFuture.completeExceptionally((Throwable)pendingAcquisitionsFull);
        } else if (connectionEntryWithMetadata != null) {
            if (connectionEntryWithMetadata.connectionEntry.connection != null) {
                ConnectionEntryWithMetadata entryWithMetadata = connectionEntryWithMetadata;
                entry = entryWithMetadata.connectionEntry;
                this.livenessCheckStage(entry, parentObservation).whenComplete((ignored, throwable) -> {
                    if (throwable != null) {
                        this.purge(entry);
                        this.connect(acquisitionFuture, authToken, overrideAuthToken, minVersion, notificationConfig, parentObservation);
                    } else {
                        PooledBoltConnection pooledConnection = new PooledBoltConnection(entry.connection, this, () -> this.release(entry), () -> this.purge(entry), observationParent -> this.observationProvider.pooledConnectionInUse((ImmutableObservation)observationParent, this.poolId, this.uri));
                        this.reauthStage(entryWithMetadata, authToken).whenComplete((ignored2, throwable2) -> {
                            if (!acquisitionFuture.complete(pooledConnection)) {
                                CompletableFuture<PooledBoltConnection> pendingAcquisition;
                                PooledBoltConnectionSource pooledBoltConnectionSource = this;
                                synchronized (pooledBoltConnectionSource) {
                                    pendingAcquisition = this.pendingAcquisitions.poll();
                                    if (pendingAcquisition == null) {
                                        entry.available = true;
                                    }
                                }
                                if (pendingAcquisition != null) {
                                    pendingAcquisition.complete(pooledConnection);
                                }
                            }
                        });
                    }
                });
            } else {
                Observation createObservation = this.observationProvider.pooledConnectionCreate(this.poolId, this.uri);
                entry = connectionEntryWithMetadata.connectionEntry;
                CompletionStage authStage = this.securityPlanSupplier.getPlan().thenCompose(securityPlan -> {
                    if (overrideAuthToken || empty.get()) {
                        return CompletableFuture.completedStage(new SecurityPlanAndAuthToken((SecurityPlan)securityPlan, authToken));
                    }
                    return this.authTokenManager.getToken().thenApply(token -> new SecurityPlanAndAuthToken((SecurityPlan)securityPlan, (AuthToken)token));
                });
                authStage.thenCompose(auth -> this.boltConnectionProvider.connect(this.uri, this.routingContextAddress, this.boltAgent, this.userAgent, this.connectTimeoutMillis, auth.securityPlan(), auth.authToken(), minVersion, notificationConfig, (ImmutableObservation)createObservation)).whenComplete((boltConnection, throwable) -> {
                    Throwable error = FutureUtil.completionExceptionCause(throwable);
                    if (error != null) {
                        PooledBoltConnectionSource pooledBoltConnectionSource = this;
                        synchronized (pooledBoltConnectionSource) {
                            this.pooledConnectionEntries.remove(entry);
                        }
                        if (error instanceof BoltFailureException) {
                            BoltFailureException boltFailureException = (BoltFailureException)error;
                            SecurityPlanAndAuthToken usedAuth = authStage.toCompletableFuture().getNow(null);
                            if (usedAuth != null) {
                                error = this.authTokenManager.handleBoltFailureException(usedAuth.authToken(), boltFailureException);
                            }
                        }
                        createObservation.error(error);
                        createObservation.stop();
                        acquisitionFuture.completeExceptionally(error);
                    } else {
                        PooledBoltConnectionSource boltFailureException = this;
                        synchronized (boltFailureException) {
                            entry.connection = boltConnection;
                            entry.createdTimestamp = this.clock.millis();
                        }
                        createObservation.stop();
                        PooledBoltConnection pooledConnection = new PooledBoltConnection((BoltConnection)boltConnection, this, () -> this.release(entry), () -> this.purge(entry), observationParent -> this.observationProvider.pooledConnectionInUse((ImmutableObservation)observationParent, this.poolId, this.uri));
                        if (!acquisitionFuture.complete(pooledConnection)) {
                            CompletableFuture<PooledBoltConnection> pendingAcquisition;
                            PooledBoltConnectionSource pooledBoltConnectionSource = this;
                            synchronized (pooledBoltConnectionSource) {
                                pendingAcquisition = this.pendingAcquisitions.poll();
                                if (pendingAcquisition == null) {
                                    entry.available = true;
                                }
                            }
                            if (pendingAcquisition != null) {
                                pendingAcquisition.complete(pooledConnection);
                            }
                        }
                    }
                });
            }
        }
    }

    private synchronized ConnectionEntryWithMetadata acquireExistingEntry(AuthToken authToken, BoltProtocolVersion minVersion) {
        ConnectionEntryWithMetadata connectionEntryWithMetadata = null;
        Iterator<ConnectionEntry> iterator = this.pooledConnectionEntries.iterator();
        while (iterator.hasNext()) {
            boolean reauthNeeded;
            long currentTime;
            ConnectionEntry connectionEntry = iterator.next();
            if (!connectionEntry.available) continue;
            BoltConnection connection = connectionEntry.connection;
            if (connection.state() != BoltConnectionState.OPEN) {
                connection.close();
                iterator.remove();
                continue;
            }
            if (minVersion != null && minVersion.compareTo(connection.protocolVersion()) > 0) {
                throw new MinVersionAcquisitionException("lower version", connection.protocolVersion());
            }
            if (this.maxLifetime > 0L && (currentTime = this.clock.millis()) - connectionEntry.createdTimestamp > this.maxLifetime) {
                iterator.remove();
                Observation closeObservation = this.observationProvider.pooledConnectionClose(this.poolId, this.uri);
                connection.close().whenComplete((ignored, throwable) -> closeObservation.stop());
                continue;
            }
            AuthInfo authInfo = connection.authInfo().toCompletableFuture().getNow(null);
            boolean expiredByError = this.minAuthTimestamp > 0L && authInfo.authAckMillis() <= this.minAuthTimestamp;
            boolean authMatches = authToken.equals((Object)authInfo.authToken());
            boolean bl = reauthNeeded = expiredByError || !authMatches;
            if (reauthNeeded && new BoltProtocolVersion(5, 1).compareTo(connectionEntry.connection.protocolVersion()) > 0) {
                this.log.log(System.Logger.Level.DEBUG, "reauth is not supported, the connection is voided");
                iterator.remove();
                Observation observation = this.observationProvider.pooledConnectionClose(this.poolId, this.uri);
                connectionEntry.connection.close().whenComplete((ignored, throwable) -> {
                    if (throwable != null) {
                        this.log.log(System.Logger.Level.WARNING, "Connection close has failed with %s.", throwable.getClass().getCanonicalName());
                    }
                    observation.stop();
                });
                continue;
            }
            this.log.log(System.Logger.Level.DEBUG, "Connection acquired from the pool. " + String.valueOf(this.address));
            connectionEntry.available = false;
            connectionEntryWithMetadata = new ConnectionEntryWithMetadata(connectionEntry, reauthNeeded);
            break;
        }
        return connectionEntryWithMetadata;
    }

    private CompletionStage<Void> reauthStage(ConnectionEntryWithMetadata connectionEntryWithMetadata, AuthToken authToken) {
        CompletionStage<Object> stage = connectionEntryWithMetadata.reauthNeeded ? connectionEntryWithMetadata.connectionEntry.connection.write(List.of(Messages.logoff(), Messages.logon((AuthToken)authToken))).handle((ignored, throwable) -> {
            if (throwable != null) {
                connectionEntryWithMetadata.connectionEntry.connection.close();
                PooledBoltConnectionSource pooledBoltConnectionSource = this;
                synchronized (pooledBoltConnectionSource) {
                    this.pooledConnectionEntries.remove(connectionEntryWithMetadata.connectionEntry);
                }
            }
            return null;
        }) : CompletableFuture.completedStage(null);
        return stage;
    }

    private CompletionStage<Void> livenessCheckStage(ConnectionEntry entry, ImmutableObservation parentObservation) {
        CompletionStage<Object> stage;
        if (this.idleBeforeTest >= 0L && entry.lastUsedTimestamp + this.idleBeforeTest < this.clock.millis()) {
            BasicResponseHandler resetHandler = new BasicResponseHandler();
            stage = entry.connection.writeAndFlush((ResponseHandler)resetHandler, (Message)Messages.reset(), parentObservation).thenCompose(ignored -> resetHandler.summaries()).thenApply(ignored -> null);
        } else {
            stage = CompletableFuture.completedStage(null);
        }
        return stage;
    }

    public CompletionStage<Void> verifyConnectivity() {
        return this.getConnection().thenCompose(BoltConnection::close);
    }

    public CompletionStage<Boolean> supportsMultiDb() {
        return this.getConnection().thenCompose(boltConnection -> {
            boolean supports = boltConnection.protocolVersion().compareTo(new BoltProtocolVersion(4, 0)) >= 0;
            return boltConnection.close().thenApply(ignored -> supports);
        });
    }

    public CompletionStage<Boolean> supportsSessionAuth() {
        return this.getConnection().thenCompose(boltConnection -> {
            boolean supports = new BoltProtocolVersion(5, 1).compareTo(boltConnection.protocolVersion()) <= 0;
            return boltConnection.close().thenApply(ignored -> supports);
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletionStage<Void> close() {
        CompletionStage<Void> closeStage;
        PooledBoltConnectionSource pooledBoltConnectionSource = this;
        synchronized (pooledBoltConnectionSource) {
            if (this.closeStage == null) {
                Observation closeObservation = this.observationProvider.connectionPoolClose(this.poolId, this.uri);
                this.closeStage = CompletableFuture.completedStage(null);
                Iterator<ConnectionEntry> iterator = this.pooledConnectionEntries.iterator();
                while (iterator.hasNext()) {
                    ConnectionEntry entry = iterator.next();
                    if (entry.connection != null && entry.connection.state() == BoltConnectionState.OPEN) {
                        this.closeStage = this.closeStage.thenCompose(ignored -> entry.connection.close().exceptionally(throwable -> null));
                    }
                    iterator.remove();
                }
                this.closeStage = this.closeStage.thenCompose(ignored -> this.boltConnectionProvider.close()).exceptionally(throwable -> null).whenComplete((ignored, throwable) -> {
                    this.executorService.shutdown();
                    closeObservation.stop();
                });
            }
            closeStage = this.closeStage;
        }
        return closeStage;
    }

    synchronized int size() {
        return this.pooledConnectionEntries.size();
    }

    synchronized int inUse() {
        return this.pooledConnectionEntries.stream().filter(entry -> !entry.available).toList().size();
    }

    private String poolId(BoltServerAddress serverAddress) {
        return serverAddress.port() == 0 ? String.format("%s-%d", serverAddress.host(), this.hashCode()) : String.format("%s:%d-%d", serverAddress.host(), serverAddress.port(), this.hashCode());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void release(ConnectionEntry entry) {
        CompletableFuture<PooledBoltConnection> pendingAcquisition;
        PooledBoltConnectionSource pooledBoltConnectionSource = this;
        synchronized (pooledBoltConnectionSource) {
            entry.lastUsedTimestamp = this.clock.millis();
            pendingAcquisition = this.pendingAcquisitions.poll();
            if (pendingAcquisition == null) {
                entry.available = true;
            }
        }
        if (pendingAcquisition != null) {
            pendingAcquisition.complete(new PooledBoltConnection(entry.connection, this, () -> this.release(entry), () -> this.purge(entry), observationParent -> this.observationProvider.pooledConnectionInUse((ImmutableObservation)observationParent, this.poolId, this.uri)));
        }
        this.log.log(System.Logger.Level.DEBUG, "Connection released to the pool.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void purge(ConnectionEntry entry) {
        PooledBoltConnectionSource pooledBoltConnectionSource = this;
        synchronized (pooledBoltConnectionSource) {
            this.pooledConnectionEntries.remove(entry);
        }
        Observation closeObservation = this.observationProvider.pooledConnectionClose(this.poolId, this.uri);
        entry.connection.close().whenComplete((ignored, throwable) -> closeObservation.stop());
        this.log.log(System.Logger.Level.DEBUG, "Connection purged from the pool.");
    }

    public synchronized void onExpired() {
        long now = this.clock.millis();
        this.minAuthTimestamp = Math.max(this.minAuthTimestamp, now);
    }

    private record ConnectionEntryWithMetadata(ConnectionEntry connectionEntry, boolean reauthNeeded) {
    }

    private static class ConnectionEntry {
        private BoltConnection connection;
        private boolean available;
        private long createdTimestamp;
        private long lastUsedTimestamp;

        private ConnectionEntry() {
        }
    }

    private record SecurityPlanAndAuthToken(SecurityPlan securityPlan, AuthToken authToken) {
    }
}

