/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.vespa.http.client.core.communication;

import com.yahoo.vespa.http.client.FeedConnectException;
import com.yahoo.vespa.http.client.FeedProtocolException;
import com.yahoo.vespa.http.client.Result;
import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.core.Document;
import com.yahoo.vespa.http.client.core.EndpointResult;
import com.yahoo.vespa.http.client.core.Exceptions;
import com.yahoo.vespa.http.client.core.ServerResponseException;
import com.yahoo.vespa.http.client.core.communication.DocumentQueue;
import com.yahoo.vespa.http.client.core.communication.EndpointResultQueue;
import com.yahoo.vespa.http.client.core.communication.GatewayConnection;
import com.yahoo.vespa.http.client.core.communication.GatewayConnectionFactory;
import com.yahoo.vespa.http.client.core.communication.GatewayThrottler;
import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory;
import java.io.IOException;
import java.io.InputStream;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

public class IOThread
implements Runnable,
AutoCloseable {
    private static final Logger log = Logger.getLogger(IOThread.class.getName());
    private final Endpoint endpoint;
    private final GatewayConnectionFactory connectionFactory;
    private final DocumentQueue documentQueue;
    private final EndpointResultQueue resultQueue;
    private final Thread thread;
    private final int clusterId;
    private final CountDownLatch running = new CountDownLatch(1);
    private final CountDownLatch stopSignal = new CountDownLatch(1);
    private final int maxChunkSizeBytes;
    private final int maxInFlightRequests;
    private final Duration localQueueTimeOut;
    private final GatewayThrottler gatewayThrottler;
    private final Duration connectionTimeToLive;
    private final long pollIntervalUS;
    private final Clock clock;
    private final Random random = new Random();
    private final OldConnectionsDrainer oldConnectionsDrainer;
    private volatile GatewayConnection currentConnection;
    private volatile ConnectionState connectionState = ConnectionState.DISCONNECTED;
    private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0);
    private final AtomicInteger wrongVersionDetectedCounter = new AtomicInteger(0);
    private final AtomicInteger problemStatusCodeFromServerCounter = new AtomicInteger(0);
    private final AtomicInteger executeProblemsCounter = new AtomicInteger(0);
    private final AtomicInteger docsReceivedCounter = new AtomicInteger(0);
    private final AtomicInteger statusReceivedCounter = new AtomicInteger(0);
    private final AtomicInteger pendingDocumentStatusCount = new AtomicInteger(0);
    private final AtomicInteger successfulHandshakes = new AtomicInteger(0);
    private final AtomicInteger lastGatewayProcessTimeMillis = new AtomicInteger(0);

    IOThread(ThreadGroup ioThreadGroup, Endpoint endpoint, EndpointResultQueue endpointResultQueue, GatewayConnectionFactory connectionFactory, int clusterId, int maxChunkSizeBytes, int maxInFlightRequests, Duration localQueueTimeOut, DocumentQueue documentQueue, long maxSleepTimeMs, Duration connectionTimeToLive, boolean runThreads, double idlePollFrequency, Clock clock) {
        this.endpoint = endpoint;
        this.documentQueue = documentQueue;
        this.connectionFactory = connectionFactory;
        this.currentConnection = connectionFactory.newConnection();
        this.resultQueue = endpointResultQueue;
        this.clusterId = clusterId;
        this.maxChunkSizeBytes = maxChunkSizeBytes;
        this.maxInFlightRequests = maxInFlightRequests;
        this.connectionTimeToLive = connectionTimeToLive;
        this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs);
        this.pollIntervalUS = Math.max(1000L, (long)(1000000.0 / Math.max(0.1, idlePollFrequency)));
        this.clock = clock;
        this.localQueueTimeOut = localQueueTimeOut;
        this.oldConnectionsDrainer = new OldConnectionsDrainer(endpoint, clusterId, Duration.ofMillis(this.pollIntervalUS / 1000L), connectionTimeToLive, localQueueTimeOut, this.statusReceivedCounter, this.resultQueue, this.stopSignal, clock);
        if (runThreads) {
            this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint);
            this.thread.setDaemon(true);
            this.thread.start();
            Thread thread = new Thread(ioThreadGroup, this.oldConnectionsDrainer, "IOThread " + endpoint + " drainer");
            thread.setDaemon(true);
            thread.start();
        } else {
            this.thread = null;
        }
    }

    public Endpoint getEndpoint() {
        return this.endpoint;
    }

    public ConnectionStats getConnectionStats() {
        return new ConnectionStats(this.wrongSessionDetectedCounter.get(), this.wrongVersionDetectedCounter.get(), this.problemStatusCodeFromServerCounter.get(), this.executeProblemsCounter.get(), this.docsReceivedCounter.get(), this.statusReceivedCounter.get(), this.pendingDocumentStatusCount.get(), this.successfulHandshakes.get(), this.lastGatewayProcessTimeMillis.get());
    }

    @Override
    public void close() {
        this.documentQueue.close();
        if (this.stopSignal.getCount() == 0L) {
            return;
        }
        this.stopSignal.countDown();
        log.finer("Closed called.");
        this.oldConnectionsDrainer.close();
        int size = this.resultQueue.getPendingSize();
        if (size > 0) {
            log.info("We have outstanding operations (" + size + ") , trying to fetch responses.");
            try {
                this.processResponse(this.currentConnection.drain());
            }
            catch (Throwable e) {
                log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e);
            }
        }
        try {
            this.currentConnection.close();
        }
        finally {
            this.drainDocumentQueueWhenFailingPermanently(new Exception("Closed call, did not manage to process everything so failing this document."));
        }
        log.fine("Session to " + this.endpoint + " closed.");
    }

    public void post(Document document) throws InterruptedException {
        this.documentQueue.put(document, true);
    }

    public String toString() {
        return "I/O thread (for " + this.endpoint + ")";
    }

    List<Document> getNextDocsForFeeding(long maxWaitUnits, TimeUnit timeUnit) {
        Document document;
        ArrayList<Document> docsForSendChunk = new ArrayList<Document>();
        int chunkSizeBytes = 0;
        try {
            Document doc;
            this.drainFirstDocumentsInQueueIfOld();
            Document document2 = doc = this.thread != null ? this.documentQueue.poll(maxWaitUnits, timeUnit) : this.documentQueue.poll();
            if (doc != null) {
                docsForSendChunk.add(doc);
                chunkSizeBytes = doc.size();
            }
        }
        catch (InterruptedException ie) {
            log.fine("Got break signal while waiting for new documents to feed");
            return docsForSendChunk;
        }
        int thisMaxChunkSizeBytes = this.randomize(this.maxChunkSizeBytes);
        int thisMaxInFlightRequests = this.randomize(this.maxInFlightRequests);
        for (int pendingSize = 1 + this.resultQueue.getPendingSize(); chunkSizeBytes < thisMaxChunkSizeBytes && pendingSize < thisMaxInFlightRequests; chunkSizeBytes += document.size(), ++pendingSize) {
            this.drainFirstDocumentsInQueueIfOld();
            document = this.documentQueue.poll();
            if (document == null) break;
            docsForSendChunk.add(document);
        }
        if (log.isLoggable(Level.FINEST)) {
            log.finest("Chunk has " + docsForSendChunk.size() + " docs with a size " + chunkSizeBytes + " bytes");
        }
        this.docsReceivedCounter.addAndGet(docsForSendChunk.size());
        return docsForSendChunk;
    }

    private int randomize(int limit) {
        double multiplier = 0.75 + 0.25 * this.random.nextDouble();
        return Math.max(1, (int)((double)limit * multiplier));
    }

    private void addDocumentsToResultQueue(List<Document> docs) {
        for (Document doc : docs) {
            this.resultQueue.operationSent(doc.getOperationId(), this.currentConnection);
        }
    }

    private void markDocumentAsFailed(List<Document> docs, ServerResponseException servletException) {
        for (Document doc : docs) {
            this.resultQueue.failOperation(EndPointResultFactory.createTransientError(this.endpoint, doc.getOperationId(), servletException), this.clusterId);
        }
    }

    private InputStream sendAndReceive(List<Document> docs) throws IOException, ServerResponseException {
        try {
            return this.currentConnection.write(docs);
        }
        catch (ServerResponseException ser) {
            this.markDocumentAsFailed(docs, ser);
            throw ser;
        }
        catch (Exception e) {
            this.markDocumentAsFailed(docs, new ServerResponseException(Exceptions.toMessageString(e)));
            throw e;
        }
    }

    private ProcessResponse processResponse(InputStream serverResponse) throws IOException {
        return IOThread.processResponse(serverResponse, this.endpoint, this.clusterId, this.statusReceivedCounter, this.resultQueue);
    }

    private static ProcessResponse processResponse(InputStream serverResponse, Endpoint endpoint, int clusterId, AtomicInteger statusReceivedCounter, EndpointResultQueue resultQueue) throws IOException {
        Collection<EndpointResult> endpointResults = EndPointResultFactory.createResult(endpoint, serverResponse);
        statusReceivedCounter.addAndGet(endpointResults.size());
        int transientErrors = 0;
        for (EndpointResult endpointResult : endpointResults) {
            if (endpointResult.getDetail().getResultType() == Result.ResultType.TRANSITIVE_ERROR) {
                ++transientErrors;
            }
            resultQueue.resultReceived(endpointResult, clusterId);
        }
        return new ProcessResponse(transientErrors, endpointResults.size());
    }

    private ProcessResponse feedDocumentAndProcessResults(List<Document> docs) throws ServerResponseException, IOException {
        this.addDocumentsToResultQueue(docs);
        long startTime = this.clock.millis();
        InputStream serverResponse = this.sendAndReceive(docs);
        ProcessResponse processResponse = this.processResponse(serverResponse);
        this.lastGatewayProcessTimeMillis.set((int)(this.clock.millis() - startTime));
        return processResponse;
    }

    private ProcessResponse pullAndProcessData(long maxWaitTimeUS) throws ServerResponseException, IOException {
        List<Object> nextDocsForFeeding;
        int pendingResultQueueSize = this.resultQueue.getPendingSize();
        this.pendingDocumentStatusCount.set(pendingResultQueueSize);
        List<Object> list = nextDocsForFeeding = pendingResultQueueSize > this.maxInFlightRequests ? new ArrayList() : this.getNextDocsForFeeding(maxWaitTimeUS, TimeUnit.MICROSECONDS);
        if (nextDocsForFeeding.isEmpty() && pendingResultQueueSize == 0) {
            log.finest("No document awaiting feeding, not waiting for results.");
            return new ProcessResponse(0, 0);
        }
        log.finest("Awaiting " + pendingResultQueueSize + " results.");
        ProcessResponse processResponse = this.feedDocumentAndProcessResults(nextDocsForFeeding);
        if (pendingResultQueueSize > this.maxInFlightRequests && processResponse.processResultsCount == 0) {
            try {
                Thread.sleep(300L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        return processResponse;
    }

    private ConnectionState cycle(ConnectionState connectionState) {
        switch (connectionState) {
            case DISCONNECTED: {
                try {
                    if (!this.currentConnection.connect()) {
                        log.log(Level.WARNING, "Could not connect to endpoint: '" + this.endpoint + "'. Will re-try.");
                        this.drainFirstDocumentsInQueueIfOld();
                        return ConnectionState.DISCONNECTED;
                    }
                    return ConnectionState.CONNECTED;
                }
                catch (Throwable throwable1) {
                    this.drainFirstDocumentsInQueueIfOld();
                    log.log(Level.INFO, "Failed connecting to endpoint: '" + this.endpoint + "'. Will re-try connecting.", throwable1);
                    this.executeProblemsCounter.incrementAndGet();
                    return ConnectionState.DISCONNECTED;
                }
            }
            case CONNECTED: {
                try {
                    if (this.isStale(this.currentConnection)) {
                        return this.refreshConnection(connectionState);
                    }
                    this.currentConnection.handshake();
                    this.successfulHandshakes.getAndIncrement();
                }
                catch (ServerResponseException ser) {
                    int code = ser.getResponseCode();
                    if (code == 401 || code == 403) {
                        this.drainDocumentQueueWhenFailingPermanently(new Exception("Denied access by endpoint:" + ser.getResponseString()));
                        log.log(Level.SEVERE, "Failed authentication or authorization with '" + this.endpoint + "': " + Exceptions.toMessageString(ser));
                        return ConnectionState.CONNECTED;
                    }
                    this.executeProblemsCounter.incrementAndGet();
                    log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + this.endpoint + "' failed -- will re-try handshake: " + Exceptions.toMessageString(ser));
                    this.drainFirstDocumentsInQueueIfOld();
                    this.resultQueue.onEndpointError(new FeedProtocolException(ser.getResponseCode(), ser.getResponseString(), ser, this.endpoint));
                    return ConnectionState.CONNECTED;
                }
                catch (Throwable throwable) {
                    this.executeProblemsCounter.incrementAndGet();
                    this.resultQueue.onEndpointError(new FeedConnectException(throwable, this.endpoint));
                    log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + this.endpoint + "' failed. Will re-try handshake.", throwable);
                    this.drainFirstDocumentsInQueueIfOld();
                    this.currentConnection.close();
                    return ConnectionState.DISCONNECTED;
                }
                return ConnectionState.SESSION_SYNCED;
            }
            case SESSION_SYNCED: {
                try {
                    if (this.isStale(this.currentConnection)) {
                        return this.refreshConnection(connectionState);
                    }
                    ProcessResponse processResponse = this.pullAndProcessData(this.pollIntervalUS);
                    this.gatewayThrottler.handleCall(processResponse.transitiveErrorCount);
                }
                catch (ServerResponseException ser) {
                    log.log(Level.INFO, "Problems while handing data over to endpoint '" + this.endpoint + "'. Will re-try. Endpoint responded with an unexpected HTTP response code.", ser);
                    return ConnectionState.CONNECTED;
                }
                catch (Throwable e) {
                    log.log(Level.INFO, "Connection level error handing data over to endpoint '" + this.endpoint + "'. Will re-try.", e);
                    this.currentConnection.close();
                    return ConnectionState.DISCONNECTED;
                }
                return ConnectionState.SESSION_SYNCED;
            }
        }
        log.severe("Should never get here.");
        this.currentConnection.close();
        return ConnectionState.DISCONNECTED;
    }

    private void sleepIfProblemsGettingSyncedConnection(ConnectionState newState, ConnectionState oldState) {
        if (newState == ConnectionState.SESSION_SYNCED) {
            return;
        }
        if (newState == ConnectionState.CONNECTED && oldState == ConnectionState.DISCONNECTED) {
            return;
        }
        try {
            if (this.stopSignal.getCount() > 0L || !this.documentQueue.isEmpty()) {
                Thread.sleep(this.gatewayThrottler.distribute(3000));
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    @Override
    public void run() {
        while (this.stopSignal.getCount() > 0L || !this.documentQueue.isEmpty()) {
            this.tick();
        }
        log.finer(this.toString() + " exiting, documentQueue.size()=" + this.documentQueue.size());
        this.running.countDown();
    }

    public void tick() {
        ConnectionState oldState = this.connectionState;
        this.connectionState = this.cycle(this.connectionState);
        if (this.thread == null) {
            this.oldConnectionsDrainer.checkOldConnections();
        }
        if (this.thread != null) {
            this.sleepIfProblemsGettingSyncedConnection(this.connectionState, oldState);
        }
    }

    private void drainFirstDocumentsInQueueIfOld() {
        Optional<Document> document;
        while ((document = this.documentQueue.pollDocumentIfTimedoutInQueue(this.localQueueTimeOut)).isPresent()) {
            EndpointResult endpointResult = EndPointResultFactory.createTransientError(this.endpoint, document.get().getOperationId(), new Exception("Not sending document operation, timed out in queue after " + (this.clock.millis() - document.get().getQueueInsertTime().toEpochMilli()) + " ms."));
            this.resultQueue.failOperation(endpointResult, this.clusterId);
        }
        return;
    }

    private void drainDocumentQueueWhenFailingPermanently(Exception exception) {
        this.resultQueue.failPending(exception);
        for (Document document : this.documentQueue.removeAllDocuments()) {
            EndpointResult endpointResult = EndPointResultFactory.createError(this.endpoint, document.getOperationId(), exception);
            this.resultQueue.failOperation(endpointResult, this.clusterId);
        }
    }

    private boolean isStale(GatewayConnection connection) {
        return connection.connectionTime() != null && connection.connectionTime().plus(this.connectionTimeToLive).isBefore(this.clock.instant());
    }

    private ConnectionState refreshConnection(ConnectionState currentConnectionState) {
        if (currentConnectionState == ConnectionState.SESSION_SYNCED) {
            this.oldConnectionsDrainer.add(this.currentConnection);
        }
        this.currentConnection = this.connectionFactory.newConnection();
        return ConnectionState.DISCONNECTED;
    }

    public GatewayConnection currentConnection() {
        return this.currentConnection;
    }

    public List<GatewayConnection> oldConnections() {
        return this.oldConnectionsDrainer.connections();
    }

    public EndpointResultQueue resultQueue() {
        return this.resultQueue;
    }

    private static class ProcessResponse {
        private final int transitiveErrorCount;
        private final int processResultsCount;

        ProcessResponse(int transitiveErrorCount, int processResultsCount) {
            this.transitiveErrorCount = transitiveErrorCount;
            this.processResultsCount = processResultsCount;
        }
    }

    private static enum ConnectionState {
        DISCONNECTED,
        CONNECTED,
        SESSION_SYNCED;

    }

    private static class OldConnectionsDrainer
    implements Runnable {
        private static final Logger log = Logger.getLogger(OldConnectionsDrainer.class.getName());
        private final Endpoint endpoint;
        private final int clusterId;
        private final Duration pollInterval;
        private final Duration connectionTimeToLive;
        private final Duration localQueueTimeOut;
        private final AtomicInteger statusReceivedCounter;
        private final EndpointResultQueue resultQueue;
        private final CountDownLatch stopSignal;
        private final Clock clock;
        private final List<GatewayConnection> connections = new CopyOnWriteArrayList<GatewayConnection>();

        OldConnectionsDrainer(Endpoint endpoint, int clusterId, Duration pollInterval, Duration connectionTimeToLive, Duration localQueueTimeOut, AtomicInteger statusReceivedCounter, EndpointResultQueue resultQueue, CountDownLatch stopSignal, Clock clock) {
            this.endpoint = endpoint;
            this.clusterId = clusterId;
            this.pollInterval = pollInterval;
            this.connectionTimeToLive = connectionTimeToLive;
            this.localQueueTimeOut = localQueueTimeOut;
            this.statusReceivedCounter = statusReceivedCounter;
            this.resultQueue = resultQueue;
            this.stopSignal = stopSignal;
            this.clock = clock;
        }

        public void add(GatewayConnection connection) {
            this.connections.add(connection);
        }

        @Override
        public void run() {
            while (this.stopSignal.getCount() > 0L) {
                try {
                    this.checkOldConnections();
                    Thread.sleep(this.pollInterval.toMillis());
                }
                catch (InterruptedException e) {
                    log.log(Level.WARNING, "Close thread was interrupted: " + e.getMessage(), e);
                    Thread.currentThread().interrupt();
                    return;
                }
                catch (Exception e) {
                    log.log(Level.WARNING, "Connection draining failed: " + e.getMessage(), e);
                }
            }
        }

        public void checkOldConnections() {
            for (GatewayConnection connection : this.connections) {
                if (!this.resultQueue.hasInflightOperations(connection)) {
                    log.fine(() -> connection + " no longer has inflight operations");
                    this.closeConnection(connection);
                    continue;
                }
                if (this.closingTime(connection).isBefore(this.clock.instant())) {
                    log.fine(() -> connection + " still has inflight operations, but drain period is over");
                    this.tryPollAndDrainInflightOperations(connection);
                    this.closeConnection(connection);
                    continue;
                }
                if (!this.timeToPoll(connection)) continue;
                this.tryPollAndDrainInflightOperations(connection);
            }
        }

        private void closeConnection(GatewayConnection connection) {
            log.fine(() -> "Closing " + connection);
            connection.close();
            this.connections.remove(connection);
        }

        private void tryPollAndDrainInflightOperations(GatewayConnection connection) {
            try {
                log.fine(() -> "Polling and draining inflight operations for " + connection);
                IOThread.processResponse(connection.poll(), this.endpoint, this.clusterId, this.statusReceivedCounter, this.resultQueue);
            }
            catch (Exception e) {
                log.log(Level.FINE, e, () -> "Polling status of inflight operations failed: " + e.getMessage());
            }
        }

        private boolean timeToPoll(GatewayConnection connection) {
            Instant now = this.clock.instant();
            Instant endOfLife = connection.connectionTime().plus(this.connectionTimeToLive);
            if (connection.lastPollTime() == null) {
                return endOfLife.plus(this.pollInterval).isBefore(now);
            }
            if (connection.lastPollTime().plus(this.pollInterval).isAfter(now)) {
                return false;
            }
            double connectionEndOfLife = endOfLife.toEpochMilli();
            double connectionLastPolled = connection.lastPollTime().toEpochMilli();
            return (double)now.toEpochMilli() - connectionEndOfLife > 2.0 * (connectionLastPolled - connectionEndOfLife);
        }

        private Instant closingTime(GatewayConnection connection) {
            return connection.connectionTime().plus(this.connectionTimeToLive).plus(this.localQueueTimeOut);
        }

        private void close() {
            int size = this.resultQueue.getPendingSize();
            if (size > 0) {
                log.info("We have outstanding operations (" + size + ") , trying to fetch responses.");
                for (GatewayConnection connection : this.connections) {
                    try {
                        IOThread.processResponse(connection.poll(), this.endpoint, this.clusterId, this.statusReceivedCounter, this.resultQueue);
                    }
                    catch (Throwable e) {
                        log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e);
                    }
                }
            }
            for (GatewayConnection oldConnection : this.connections) {
                oldConnection.close();
            }
        }

        public List<GatewayConnection> connections() {
            return Collections.unmodifiableList(this.connections);
        }
    }

    public static class ConnectionStats {
        public final int wrongSessionDetectedCounter;
        public final int wrongVersionDetectedCounter;
        public final int problemStatusCodeFromServerCounter;
        public final int executeProblemsCounter;
        public final int docsReceivedCounter;
        public final int statusReceivedCounter;
        public final int pendingDocumentStatusCount;
        public final int successfullHandshakes;
        public final int lastGatewayProcessTimeMillis;

        ConnectionStats(int wrongSessionDetectedCounter, int wrongVersionDetectedCounter, int problemStatusCodeFromServerCounter, int executeProblemsCounter, int docsReceivedCounter, int statusReceivedCounter, int pendingDocumentStatusCount, int successfullHandshakes, int lastGatewayProcessTimeMillis) {
            this.wrongSessionDetectedCounter = wrongSessionDetectedCounter;
            this.wrongVersionDetectedCounter = wrongVersionDetectedCounter;
            this.problemStatusCodeFromServerCounter = problemStatusCodeFromServerCounter;
            this.executeProblemsCounter = executeProblemsCounter;
            this.docsReceivedCounter = docsReceivedCounter;
            this.statusReceivedCounter = statusReceivedCounter;
            this.pendingDocumentStatusCount = pendingDocumentStatusCount;
            this.successfullHandshakes = successfullHandshakes;
            this.lastGatewayProcessTimeMillis = lastGatewayProcessTimeMillis;
        }
    }
}

