/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.connection.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientConfig;
import io.pravega.client.connection.impl.ClientConnection;
import io.pravega.client.connection.impl.CommandEncoder;
import io.pravega.client.connection.impl.Flow;
import io.pravega.client.connection.impl.FlowToBatchSizeTracker;
import io.pravega.client.connection.impl.IoBuffer;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.ExecutorServiceHelpers;
import io.pravega.common.io.StreamHelpers;
import io.pravega.common.util.CertificateUtils;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shaded.com.google.common.base.Strings;
import io.pravega.shaded.io.netty.buffer.ByteBuf;
import io.pravega.shared.protocol.netty.Append;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.EnhancedByteBufInputStream;
import io.pravega.shared.protocol.netty.InvalidMessageException;
import io.pravega.shared.protocol.netty.PravegaNodeUri;
import io.pravega.shared.protocol.netty.Reply;
import io.pravega.shared.protocol.netty.ReplyProcessor;
import io.pravega.shared.protocol.netty.WireCommand;
import io.pravega.shared.protocol.netty.WireCommandType;
import io.pravega.shared.protocol.netty.WireCommands;
import java.beans.ConstructorProperties;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.TrustManagerFactory;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TcpClientConnection
implements ClientConnection {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TcpClientConnection.class);
    static final int CONNECTION_TIMEOUT = 5000;
    static final int TCP_BUFFER_SIZE = 262144;
    static final int SOCKET_TIMEOUT_MS = 180000;
    private final Socket socket;
    private final CommandEncoder encoder;
    private final ConnectionReader reader;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final PravegaNodeUri location;
    private final Runnable onClose;
    private final ScheduledFuture<?> timeoutFuture;

    private TcpClientConnection(Socket socket, CommandEncoder encoder, ConnectionReader reader, PravegaNodeUri location, Runnable onClose, ScheduledExecutorService executor) {
        this.socket = Preconditions.checkNotNull(socket);
        this.encoder = Preconditions.checkNotNull(encoder);
        this.reader = Preconditions.checkNotNull(reader);
        this.location = Preconditions.checkNotNull(location);
        this.onClose = onClose;
        this.timeoutFuture = executor.scheduleWithFixedDelay(new TimeoutBatch(encoder), 20L, 20L, TimeUnit.MILLISECONDS);
    }

    public static CompletableFuture<TcpClientConnection> connect(PravegaNodeUri location, ClientConfig clientConfig, ReplyProcessor callback, ScheduledExecutorService executor, Runnable onClose) {
        return CompletableFuture.supplyAsync(() -> {
            Socket socket = TcpClientConnection.createClientSocket(location, clientConfig);
            try {
                InputStream inputStream = socket.getInputStream();
                FlowToBatchSizeTracker flowToBatchSizeTracker = new FlowToBatchSizeTracker();
                ConnectionReader reader = new ConnectionReader(location.toString(), inputStream, callback, flowToBatchSizeTracker);
                reader.start();
                CommandEncoder encoder = new CommandEncoder(requestId -> flowToBatchSizeTracker.getAppendBatchSizeTrackerByFlowId(Flow.toFlowID(requestId)), null, socket.getOutputStream());
                return new TcpClientConnection(socket, encoder, reader, location, onClose, executor);
            }
            catch (Exception e) {
                StreamHelpers.closeQuietly(socket, log, "Failed to close socket while failing.", new Object[0]);
                onClose.run();
                throw Exceptions.sneakyThrow(new ConnectionFailedException(e));
            }
        }, executor);
    }

    private static TrustManagerFactory createFromCert(String trustStoreFilePath) throws CertificateException, IOException, NoSuchAlgorithmException, KeyStoreException {
        TrustManagerFactory factory = null;
        if (!Strings.isNullOrEmpty(trustStoreFilePath)) {
            KeyStore trustStore = CertificateUtils.createTrustStore(trustStoreFilePath);
            factory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            factory.init(trustStore);
        }
        return factory;
    }

    private static Socket createClientSocket(PravegaNodeUri location, ClientConfig clientConfig) {
        try {
            Socket result;
            if (clientConfig.isEnableTlsToSegmentStore()) {
                TrustManagerFactory trustMgrFactory = TcpClientConnection.createFromCert(clientConfig.getTrustStore());
                SSLContext tlsContext = SSLContext.getInstance("TLS");
                tlsContext.init(null, trustMgrFactory != null ? trustMgrFactory.getTrustManagers() : null, null);
                SSLSocket tlsClientSocket = (SSLSocket)tlsContext.getSocketFactory().createSocket();
                if (clientConfig.isValidateHostName()) {
                    SSLParameters tlsParams = new SSLParameters();
                    tlsParams.setEndpointIdentificationAlgorithm("HTTPS");
                    tlsClientSocket.setSSLParameters(tlsParams);
                }
                result = tlsClientSocket;
            } else {
                result = new Socket();
            }
            result.setSendBufferSize(262144);
            result.setReceiveBufferSize(262144);
            result.setTcpNoDelay(true);
            result.connect(new InetSocketAddress(location.getEndpoint(), location.getPort()), 5000);
            result.setSoTimeout(180000);
            return result;
        }
        catch (Exception e) {
            throw Exceptions.sneakyThrow(new ConnectionFailedException(e));
        }
    }

    @Override
    public void send(WireCommand cmd) throws ConnectionFailedException {
        this.checkIfClosed();
        try {
            this.encoder.write(cmd);
        }
        catch (IOException e) {
            log.warn("Error writing to connection: {}", (Object)e.toString());
            this.close();
            throw new ConnectionFailedException(e);
        }
    }

    @Override
    public void send(Append append) throws ConnectionFailedException {
        this.checkIfClosed();
        try {
            this.encoder.write(append);
        }
        catch (IOException e) {
            log.warn("Error writing to connection: {}", (Object)e.toString());
            this.close();
            throw new ConnectionFailedException(e);
        }
    }

    private void checkIfClosed() throws ConnectionFailedException {
        if (this.closed.get()) {
            throw new ConnectionFailedException("Connection already closed");
        }
    }

    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.reader.stop();
            this.timeoutFuture.cancel(false);
            StreamHelpers.closeQuietly(this.socket, log, "Error closing TcpClientConnection.socket", new Object[0]);
            if (this.onClose != null) {
                this.onClose.run();
            }
        }
    }

    @VisibleForTesting
    boolean isClosed() {
        return this.closed.get();
    }

    @Override
    public void sendAsync(List<Append> appends, ClientConnection.CompletedCallback callback) {
        try {
            for (Append append : appends) {
                this.encoder.write(append);
            }
            callback.complete(null);
        }
        catch (IOException e) {
            log.warn("Error writing to connection: {}", (Object)e.toString());
            this.close();
            callback.complete(new ConnectionFailedException(e));
        }
    }

    public String toString() {
        return "TcpClientConnection [location=" + this.location + ", isClosed=" + this.closed.get() + "]";
    }

    @VisibleForTesting
    FlowToBatchSizeTracker getConnectionReaderFlowToBatchSizeTracker() {
        return this.reader.flowToBatchSizeTracker;
    }

    private static final class TimeoutBatch
    implements Runnable {
        private final AtomicLong token = new AtomicLong(-1L);
        private final CommandEncoder encoder;

        @Override
        public void run() {
            this.token.set(this.encoder.batchTimeout(this.token.get()));
        }

        @ConstructorProperties(value={"encoder"})
        @SuppressFBWarnings(justification="generated code")
        @Generated
        public TimeoutBatch(CommandEncoder encoder) {
            this.encoder = encoder;
        }
    }

    @VisibleForTesting
    static class ConnectionReader
    implements Runnable {
        static final ThreadFactory THREAD_FACTORY = ExecutorServiceHelpers.getThreadFactory("ClientSocketReaders", 7);
        private final String name;
        private final InputStream in;
        private final ReplyProcessor callback;
        private final Thread thread;
        private final FlowToBatchSizeTracker flowToBatchSizeTracker;
        private final AtomicBoolean stop = new AtomicBoolean(false);

        public ConnectionReader(String name, InputStream in, ReplyProcessor callback, FlowToBatchSizeTracker flowToBatchSizeTracker) {
            this.name = name;
            this.in = in;
            this.callback = callback;
            this.thread = THREAD_FACTORY.newThread(this);
            this.flowToBatchSizeTracker = flowToBatchSizeTracker;
        }

        public void start() {
            this.thread.start();
        }

        @Override
        public void run() {
            IoBuffer buffer = new IoBuffer();
            while (!this.stop.get()) {
                try {
                    WireCommand command = ConnectionReader.readCommand(this.in, buffer);
                    if (command instanceof WireCommands.DataAppended) {
                        WireCommands.DataAppended dataAppended = (WireCommands.DataAppended)command;
                        this.flowToBatchSizeTracker.getAppendBatchSizeTrackerByFlowId(Flow.toFlowID(dataAppended.getRequestId())).recordAck(dataAppended.getEventNumber());
                    }
                    try {
                        this.callback.process((Reply)((Object)command));
                    }
                    catch (Exception e) {
                        this.callback.processingFailure(e);
                    }
                }
                catch (SocketException e) {
                    if (e.getMessage().equals("Socket closed")) {
                        log.info("Closing TcpConnection.Reader because socket is closed.");
                    } else {
                        log.warn("Error in reading from socket.", (Throwable)e);
                    }
                    this.stop();
                }
                catch (EOFException e) {
                    log.info("Closing TcpClientConnection.Reader because end of input reached.");
                    this.stop();
                }
                catch (Exception e) {
                    log.warn("Error processing data from from server " + this.name, (Throwable)e);
                    this.stop();
                }
            }
        }

        @VisibleForTesting
        static WireCommand readCommand(InputStream in, IoBuffer buffer) throws IOException {
            ByteBuf header = buffer.getBuffOfSize(in, 8);
            int t = header.getInt(0);
            WireCommandType type = WireCommands.getType(t);
            if (type == null) {
                throw new InvalidMessageException("Unknown wire command: " + t);
            }
            int length = header.getInt(4);
            if (length < 0 || length > 0xFFFFFF) {
                throw new InvalidMessageException("Event of invalid length: " + length);
            }
            ByteBuf payload = buffer.getBuffOfSize(in, length);
            return type.readFrom(new EnhancedByteBufInputStream(payload), length);
        }

        public void stop() {
            if (this.stop.getAndSet(true)) {
                return;
            }
            StreamHelpers.closeQuietly(this.in, log, "Got error while shutting down reader {}. ", this.name);
            this.callback.connectionDropped();
        }
    }
}

