/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.vespa.http.client.core.communication;

import ai.vespa.util.http.hc4.VespaHttpClientBuilder;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yahoo.security.SslContextBuilder;
import com.yahoo.vespa.http.client.config.ConnectionParams;
import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.config.FeedParams;
import com.yahoo.vespa.http.client.core.Document;
import com.yahoo.vespa.http.client.core.Encoder;
import com.yahoo.vespa.http.client.core.ServerResponseException;
import com.yahoo.vespa.http.client.core.communication.ByteBufferInputStream;
import com.yahoo.vespa.http.client.core.communication.GatewayConnection;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;
import javax.net.ssl.SSLContext;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.apache.http.auth.AuthScheme;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.ChallengeState;
import org.apache.http.auth.Credentials;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.BasicAuthCache;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;

class ApacheGatewayConnection
implements GatewayConnection {
    private static final Logger log = Logger.getLogger(ApacheGatewayConnection.class.getName());
    private static final ObjectMapper mapper = new ObjectMapper();
    private static final String PATH = "/reserved-for-internal-use/feedapi?";
    private static final byte[] START_OF_FEED_XML = "<vespafeed>\n".getBytes(StandardCharsets.UTF_8);
    private static final byte[] END_OF_FEED_XML = "\n</vespafeed>\n".getBytes(StandardCharsets.UTF_8);
    private static final byte[] START_OF_FEED_JSON = "[".getBytes(StandardCharsets.UTF_8);
    private static final byte[] END_OF_FEED_JSON = "]".getBytes(StandardCharsets.UTF_8);
    private final List<Integer> supportedVersions = new ArrayList<Integer>();
    private final byte[] startOfFeed;
    private final byte[] endOfFeed;
    private final Endpoint endpoint;
    private final FeedParams feedParams;
    private final String clusterSpecificRoute;
    private final ConnectionParams connectionParams;
    private CloseableHttpClient httpClient;
    private Instant connectionTime = null;
    private Instant lastPollTime = null;
    private String sessionId;
    private final String clientId;
    private int negotiatedVersion = -1;
    private final HttpClientFactory httpClientFactory;
    private final String shardingKey = UUID.randomUUID().toString().substring(0, 5);
    private final Clock clock;

    ApacheGatewayConnection(Endpoint endpoint, FeedParams feedParams, String clusterSpecificRoute, ConnectionParams connectionParams, HttpClientFactory httpClientFactory, String clientId, Clock clock) {
        this.supportedVersions.add(3);
        this.endpoint = endpoint;
        this.feedParams = feedParams;
        this.clusterSpecificRoute = clusterSpecificRoute;
        this.httpClientFactory = httpClientFactory;
        this.connectionParams = connectionParams;
        this.httpClient = null;
        this.clientId = clientId;
        this.clock = clock;
        if (feedParams.getDataFormat() == FeedParams.DataFormat.JSON_UTF8) {
            this.startOfFeed = START_OF_FEED_JSON;
            this.endOfFeed = END_OF_FEED_JSON;
        } else {
            this.startOfFeed = START_OF_FEED_XML;
            this.endOfFeed = END_OF_FEED_XML;
        }
    }

    @Override
    public InputStream write(List<Document> docs) throws ServerResponseException, IOException {
        return this.write(docs, false, this.connectionParams.getUseCompression());
    }

    @Override
    public InputStream poll() throws ServerResponseException, IOException {
        this.lastPollTime = this.clock.instant();
        return this.write(Collections.emptyList(), false, false);
    }

    @Override
    public Instant lastPollTime() {
        return this.lastPollTime;
    }

    @Override
    public InputStream drain() throws ServerResponseException, IOException {
        return this.write(Collections.emptyList(), true, false);
    }

    @Override
    public boolean connect() {
        log.fine(() -> "Attempting to connect to " + this.endpoint);
        if (this.httpClient != null) {
            log.log(Level.WARNING, "Previous httpClient still exists.");
        }
        this.httpClient = this.httpClientFactory.createClient();
        this.connectionTime = this.clock.instant();
        return this.httpClient != null;
    }

    @Override
    public Instant connectionTime() {
        return this.connectionTime;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected static InputStreamEntity zipAndCreateEntity(InputStream inputStream) throws IOException {
        byte[] buffer = new byte[4096];
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (GZIPOutputStream gzos = null;){
            gzos = new GZIPOutputStream(baos);
            while (inputStream.available() > 0) {
                int length = inputStream.read(buffer);
                gzos.write(buffer, 0, length);
            }
        }
        byte[] fooGzippedBytes = baos.toByteArray();
        return new InputStreamEntity((InputStream)new ByteArrayInputStream(fooGzippedBytes), -1L);
    }

    private InputStream write(List<Document> docs, boolean drain, boolean useCompression) throws ServerResponseException, IOException {
        HttpPost httpPost = this.createPost(drain, useCompression, false);
        ByteBuffer[] buffers = this.getDataWithStartAndEndOfFeed(docs, this.negotiatedVersion);
        ByteBufferInputStream inputStream = new ByteBufferInputStream(buffers);
        InputStreamEntity reqEntity = useCompression ? ApacheGatewayConnection.zipAndCreateEntity(inputStream) : new InputStreamEntity((InputStream)inputStream, -1L);
        reqEntity.setChunked(true);
        httpPost.setEntity((HttpEntity)reqEntity);
        return this.executePost(httpPost);
    }

    private ByteBuffer[] getDataWithStartAndEndOfFeed(List<Document> docs, int version) {
        ArrayList<ByteBuffer> data = new ArrayList<ByteBuffer>();
        if (version == 3) {
            for (Document doc : docs) {
                int operationSize = doc.size() + this.startOfFeed.length + this.endOfFeed.length;
                StringBuilder envelope = new StringBuilder();
                Encoder.encode(doc.getOperationId(), envelope);
                envelope.append(' ');
                envelope.append(Integer.toHexString(operationSize));
                envelope.append('\n');
                data.add(StandardCharsets.US_ASCII.encode(envelope.toString()));
                data.add(ByteBuffer.wrap(this.startOfFeed));
                data.add(doc.getData());
                data.add(ByteBuffer.wrap(this.endOfFeed));
            }
        } else {
            throw new IllegalArgumentException("Protocol version " + version + " unsupported by client.");
        }
        return data.toArray(new ByteBuffer[data.size()]);
    }

    private HttpPost createPost(boolean drain, boolean useCompression, boolean isHandshake) {
        HttpPost httpPost = new HttpPost(this.createUri());
        Iterator<Object> iterator = this.supportedVersions.iterator();
        while (iterator.hasNext()) {
            int n = iterator.next();
            httpPost.addHeader("X-Yahoo-Feed-Protocol-Version", "" + n);
        }
        if (this.sessionId != null) {
            httpPost.setHeader("X-Yahoo-Feed-Session-Id", this.sessionId);
        }
        if (this.clientId != null) {
            httpPost.setHeader("X-Yahoo-Client-Id", this.clientId);
        }
        httpPost.setHeader("X-Yahoo-Feed-Sharding-Key", this.shardingKey);
        httpPost.setHeader("X-Yahoo-Feed-Drain", drain ? "true" : "false");
        if (this.clusterSpecificRoute != null) {
            httpPost.setHeader("X-Yahoo-Feed-Route", this.feedParams.getRoute());
        } else if (this.feedParams.getRoute() != null) {
            httpPost.setHeader("X-Yahoo-Feed-Route", this.feedParams.getRoute());
        }
        if (!isHandshake) {
            if (this.feedParams.getDataFormat() == FeedParams.DataFormat.JSON_UTF8) {
                httpPost.setHeader("X-Yahoo-Feed-Data-Format", FeedParams.DataFormat.JSON_UTF8.name());
            } else {
                httpPost.setHeader("X-Yahoo-Feed-Data-Format", FeedParams.DataFormat.XML_UTF8.name());
            }
            if (this.feedParams.getPriority() != null) {
                httpPost.setHeader("X-Yahoo-Feed-Priority", this.feedParams.getPriority());
            }
            if (this.connectionParams.getTraceLevel() != 0) {
                httpPost.setHeader("X-Yahoo-Feed-Trace-Level", String.valueOf(this.connectionParams.getTraceLevel()));
            }
            if (this.negotiatedVersion == 3 && this.feedParams.getDenyIfBusyV3()) {
                httpPost.setHeader("X-Yahoo-Feed-Deny-If-Busy", "true");
            }
        }
        if (this.feedParams.getSilentUpgrade()) {
            httpPost.setHeader("X-Yahoo-Silent-Upgrade", "true");
        }
        httpPost.setHeader("X-Yahoo-Feed-Timeout", "" + this.feedParams.getServerTimeout(TimeUnit.SECONDS));
        for (Map.Entry entry : this.connectionParams.getHeaders()) {
            httpPost.addHeader((String)entry.getKey(), (String)entry.getValue());
        }
        this.connectionParams.getDynamicHeaders().forEach((headerName, provider) -> {
            String headerValue = Objects.requireNonNull(provider.getHeaderValue(), provider.getClass().getName() + ".getHeader() returned null as header value!");
            httpPost.addHeader(headerName, headerValue);
        });
        if (useCompression) {
            httpPost.setHeader("Content-Encoding", "gzip");
        }
        return httpPost;
    }

    private InputStream executePost(HttpPost httpPost) throws ServerResponseException, IOException {
        CloseableHttpResponse response;
        if (this.httpClient == null) {
            throw new IOException("Trying to executePost while not having a connection/http client");
        }
        String proxyAuthzHeader = ApacheGatewayConnection.getCustomProxyAuthorizationHeader(this.connectionParams).orElse(null);
        if (this.connectionParams.getProxyHost() != null && proxyAuthzHeader != null) {
            HttpContext context = this.createContextForcingPreemptiveProxyAuth(proxyAuthzHeader);
            response = this.httpClient.execute((HttpUriRequest)httpPost, context);
        } else {
            response = this.httpClient.execute((HttpUriRequest)httpPost);
        }
        try {
            this.verifyServerResponseCode((HttpResponse)response);
            this.verifyServerVersion(response.getFirstHeader("X-Yahoo-Feed-Protocol-Version"));
            this.verifySessionHeader(response.getFirstHeader("X-Yahoo-Feed-Session-Id"));
        }
        catch (ServerResponseException e) {
            EntityUtils.consumeQuietly((HttpEntity)response.getEntity());
            throw e;
        }
        byte[] responseData = EntityUtils.toByteArray((HttpEntity)response.getEntity());
        return responseData == null ? null : new ByteArrayInputStream(responseData);
    }

    private static Optional<String> getCustomProxyAuthorizationHeader(ConnectionParams params) {
        return params.getHeaders().stream().filter(h -> ((String)h.getKey()).equals("Proxy-Authorization")).findAny().map(Map.Entry::getValue);
    }

    private HttpContext createContextForcingPreemptiveProxyAuth(String proxyAuthzHeader) {
        BasicAuthCache authCache = new BasicAuthCache();
        HttpHost proxy = new HttpHost(this.connectionParams.getProxyHost(), this.connectionParams.getProxyPort());
        authCache.put(proxy, (AuthScheme)new CustomAuthScheme(proxyAuthzHeader));
        BasicHttpContext context = new BasicHttpContext();
        context.setAttribute("http.auth.auth-cache", (Object)authCache);
        BasicCredentialsProvider prov = new BasicCredentialsProvider();
        prov.setCredentials(new AuthScope(proxy), (Credentials)new UsernamePasswordCredentials("", ""));
        context.setAttribute("http.auth.credentials-provider", (Object)prov);
        return context;
    }

    private void verifyServerResponseCode(HttpResponse response) throws ServerResponseException {
        StatusLine statusLine = response.getStatusLine();
        int statusCode = statusLine.getStatusCode();
        if (statusCode > 199 && statusCode < 260) {
            return;
        }
        if (statusCode == 299) {
            throw new ServerResponseException(429, "Too  many requests.");
        }
        throw new ServerResponseException(statusCode, ApacheGatewayConnection.tryGetDetailedErrorMessage(response).orElseGet(() -> ((StatusLine)statusLine).getReasonPhrase()));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static Optional<String> tryGetDetailedErrorMessage(HttpResponse response) {
        Header contentType = response.getEntity().getContentType();
        if (contentType == null) return Optional.empty();
        if (!contentType.getValue().equalsIgnoreCase("application/json")) {
            return Optional.empty();
        }
        try (InputStream in = response.getEntity().getContent();){
            JsonNode jsonNode = mapper.readTree(in);
            JsonNode message = jsonNode.get("message");
            if (message == null || message.textValue() == null) {
                Optional<String> optional2 = Optional.empty();
                return optional2;
            }
            Optional<String> optional = Optional.of(response.getStatusLine().getReasonPhrase() + " - " + message.textValue());
            return optional;
        }
        catch (IOException e) {
            return Optional.empty();
        }
    }

    private void verifySessionHeader(Header serverHeader) throws ServerResponseException {
        if (serverHeader == null) {
            throw new ServerResponseException("Got no session ID from server.");
        }
        String serverHeaderVal = serverHeader.getValue().trim();
        if (this.negotiatedVersion == 3) {
            if (this.clientId == null || !this.clientId.equals(serverHeaderVal)) {
                String message = "Running using v3. However, server responds with different session than client has set; " + serverHeaderVal + " vs client code " + this.clientId;
                log.severe(message);
                throw new ServerResponseException(message);
            }
            return;
        }
        if (this.sessionId == null) {
            log.finer("Got session ID from server: " + serverHeaderVal);
            this.sessionId = serverHeaderVal;
        } else if (!this.sessionId.equals(serverHeaderVal)) {
            log.info("Request has been routed to a server which does not recognize the client session. Most likely cause is upgrading of cluster, transitive error.");
            throw new ServerResponseException("Session ID received from server ('" + serverHeaderVal + "') does not match cached session ID ('" + this.sessionId + "')");
        }
    }

    private void verifyServerVersion(Header serverHeader) throws ServerResponseException {
        int serverVersion;
        if (serverHeader == null) {
            throw new ServerResponseException("Got bad protocol version from server.");
        }
        try {
            serverVersion = Integer.parseInt(serverHeader.getValue());
        }
        catch (NumberFormatException nfe) {
            throw new ServerResponseException("Got bad protocol version from server: " + nfe.getMessage());
        }
        if (!this.supportedVersions.contains(serverVersion)) {
            throw new ServerResponseException("Unsupported version: " + serverVersion + ". Supported versions: " + this.supportedVersions);
        }
        if (this.negotiatedVersion == -1 && log.isLoggable(Level.FINE)) {
            log.log(Level.FINE, "Server decided upon protocol version " + serverVersion + ".");
        }
        this.negotiatedVersion = serverVersion;
    }

    private String createUri() {
        StringBuilder u = new StringBuilder();
        u.append(this.endpoint.isUseSsl() ? "https://" : "http://");
        u.append(this.endpoint.getHostname());
        u.append(":").append(this.endpoint.getPort());
        u.append(PATH);
        u.append(this.feedParams.toUriParameters());
        return u.toString();
    }

    @Override
    public Endpoint getEndpoint() {
        return this.endpoint;
    }

    @Override
    public void handshake() throws ServerResponseException, IOException {
        boolean useCompression = false;
        boolean drain = false;
        boolean handshake = true;
        HttpPost httpPost = this.createPost(drain, useCompression, handshake);
        String oldSessionID = this.sessionId;
        this.sessionId = null;
        try (InputStream stream = this.executePost(httpPost);){
            if (oldSessionID != null && !oldSessionID.equals(this.sessionId)) {
                throw new ServerResponseException("Session ID changed after new handshake, some documents might not be acked to correct thread. " + this.getEndpoint() + " old " + oldSessionID + " new " + this.sessionId);
            }
            if (stream == null) {
                log.fine("Stream is null.");
            }
            log.fine("Got session ID " + this.sessionId);
        }
    }

    @Override
    public void close() {
        try {
            if (this.httpClient != null) {
                this.httpClient.close();
            }
        }
        catch (IOException e) {
            log.log(Level.WARNING, "Failed closing HTTP client", e);
        }
        this.httpClient = null;
    }

    private static void setSslContext(HttpClientBuilder builder, SSLContext sslContext) {
        builder.setSslcontext(sslContext);
    }

    public static class HttpClientFactory {
        private final FeedParams feedParams;
        final ConnectionParams connectionParams;
        final boolean useSsl;

        public HttpClientFactory(FeedParams feedParams, ConnectionParams connectionParams, boolean useSsl) {
            this.feedParams = feedParams;
            this.connectionParams = connectionParams;
            this.useSsl = useSsl;
        }

        public CloseableHttpClient createClient() {
            HttpClientBuilder clientBuilder;
            if (this.connectionParams.useTlsConfigFromEnvironment()) {
                clientBuilder = VespaHttpClientBuilder.create();
            } else {
                clientBuilder = HttpClientBuilder.create();
                if (this.connectionParams.getSslContext() != null) {
                    ApacheGatewayConnection.setSslContext(clientBuilder, this.connectionParams.getSslContext());
                } else {
                    SslContextBuilder builder = new SslContextBuilder();
                    if (this.connectionParams.getPrivateKey() != null && this.connectionParams.getCertificate() != null) {
                        builder.withKeyStore(this.connectionParams.getPrivateKey(), this.connectionParams.getCertificate());
                    }
                    if (this.connectionParams.getCaCertificates() != null) {
                        builder.withTrustStore(this.connectionParams.getCaCertificates());
                    }
                    ApacheGatewayConnection.setSslContext(clientBuilder, builder.build());
                }
                if (this.connectionParams.getHostnameVerifier() != null) {
                    clientBuilder.setSSLHostnameVerifier(this.connectionParams.getHostnameVerifier());
                }
                clientBuilder.setUserTokenHandler(context -> null);
            }
            clientBuilder.setMaxConnPerRoute(1);
            clientBuilder.setMaxConnTotal(1);
            clientBuilder.setUserAgent(String.format("vespa-http-client (%s)", "7.164.0"));
            clientBuilder.setDefaultHeaders(Collections.singletonList(new BasicHeader("Vespa-Client-Version", "7.164.0")));
            int millisTotalTimeout = (int)(this.feedParams.getClientTimeout(TimeUnit.MILLISECONDS) + this.feedParams.getServerTimeout(TimeUnit.MILLISECONDS));
            RequestConfig.Builder requestConfigBuilder = RequestConfig.custom().setSocketTimeout(millisTotalTimeout).setConnectTimeout(millisTotalTimeout);
            if (this.connectionParams.getProxyHost() != null) {
                requestConfigBuilder.setProxy(new HttpHost(this.connectionParams.getProxyHost(), this.connectionParams.getProxyPort()));
            }
            clientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
            log.fine(() -> "Creating HttpClient: ConnectionTimeout " + this.connectionParams.getConnectionTimeToLive().getSeconds() + " seconds proxyhost (can be null) " + this.connectionParams.getProxyHost() + ":" + this.connectionParams.getProxyPort() + (this.useSsl ? " using ssl " : " not using ssl"));
            return clientBuilder.build();
        }
    }

    private static class CustomAuthScheme
    extends BasicScheme {
        final String proxyAuthzHeader;

        CustomAuthScheme(String proxyAuthzHeader) {
            super(ChallengeState.PROXY);
            this.proxyAuthzHeader = proxyAuthzHeader;
        }

        public Header authenticate(Credentials credentials, HttpRequest request, HttpContext context) {
            return new BasicHeader("Proxy-Authorization", this.proxyAuthzHeader);
        }
    }
}

