/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.client;

import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.ssl.SslContext;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import org.apache.pinot.client.BrokerResponse;
import org.apache.pinot.client.ConnectionTimeouts;
import org.apache.pinot.client.PinotClientException;
import org.apache.pinot.client.PinotClientTransport;
import org.apache.pinot.client.Request;
import org.apache.pinot.client.TlsProtocols;
import org.apache.pinot.spi.utils.JsonUtils;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JsonAsyncHttpPinotClientTransport
implements PinotClientTransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(JsonAsyncHttpPinotClientTransport.class);
    private static final ObjectReader OBJECT_READER = JsonUtils.DEFAULT_READER;
    private final Map<String, String> _headers;
    private final String _scheme;
    private final int _brokerReadTimeout;
    private final AsyncHttpClient _httpClient;

    public JsonAsyncHttpPinotClientTransport() {
        this._brokerReadTimeout = 60000;
        this._headers = new HashMap<String, String>();
        this._scheme = "http";
        this._httpClient = Dsl.asyncHttpClient();
    }

    public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String scheme, @Nullable SSLContext sslContext, ConnectionTimeouts connectionTimeouts, TlsProtocols tlsProtocols) {
        this._brokerReadTimeout = connectionTimeouts.getReadTimeoutMs();
        this._headers = headers;
        this._scheme = scheme;
        DefaultAsyncHttpClientConfig.Builder builder = Dsl.config();
        if (sslContext != null) {
            builder.setSslContext((SslContext)new JdkSslContext(sslContext, true, ClientAuth.OPTIONAL));
        }
        builder.setReadTimeout(connectionTimeouts.getReadTimeoutMs()).setConnectTimeout(connectionTimeouts.getConnectTimeoutMs()).setHandshakeTimeout(connectionTimeouts.getHandshakeTimeoutMs()).setUserAgent(this.getUserAgentVersionFromClassPath()).setEnabledProtocols(tlsProtocols.getEnabledProtocols().toArray(new String[0]));
        this._httpClient = Dsl.asyncHttpClient((AsyncHttpClientConfig)builder.build());
    }

    public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String scheme, @Nullable SslContext sslContext, ConnectionTimeouts connectionTimeouts, TlsProtocols tlsProtocols) {
        this._brokerReadTimeout = connectionTimeouts.getReadTimeoutMs();
        this._headers = headers;
        this._scheme = scheme;
        DefaultAsyncHttpClientConfig.Builder builder = Dsl.config();
        if (sslContext != null) {
            builder.setSslContext(sslContext);
        }
        builder.setReadTimeout(connectionTimeouts.getReadTimeoutMs()).setConnectTimeout(connectionTimeouts.getConnectTimeoutMs()).setHandshakeTimeout(connectionTimeouts.getHandshakeTimeoutMs()).setUserAgent(this.getUserAgentVersionFromClassPath()).setEnabledProtocols(tlsProtocols.getEnabledProtocols().toArray(new String[0]));
        this._httpClient = Dsl.asyncHttpClient((AsyncHttpClientConfig)builder.build());
    }

    private String getUserAgentVersionFromClassPath() {
        Properties userAgentProperties = new Properties();
        try {
            userAgentProperties.load(JsonAsyncHttpPinotClientTransport.class.getClassLoader().getResourceAsStream("version.properties"));
        }
        catch (IOException e) {
            LOGGER.warn("Unable to set user agent version");
        }
        return userAgentProperties.getProperty("ua", "pinot-java");
    }

    @Override
    public BrokerResponse executeQuery(String brokerAddress, String query) throws PinotClientException {
        try {
            return this.executeQueryAsync(brokerAddress, query).get(this._brokerReadTimeout, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            throw new PinotClientException(e);
        }
    }

    @Override
    public Future<BrokerResponse> executeQueryAsync(String brokerAddress, String query) {
        try {
            ObjectNode json = JsonNodeFactory.instance.objectNode();
            json.put("sql", query);
            json.put("queryOptions", "groupByMode=sql;responseFormat=sql");
            String url = this._scheme + "://" + brokerAddress + "/query/sql";
            BoundRequestBuilder requestBuilder = this._httpClient.preparePost(url);
            if (this._headers != null) {
                this._headers.forEach((k, v) -> requestBuilder.addHeader((CharSequence)k, v));
            }
            ListenableFuture response = ((BoundRequestBuilder)((BoundRequestBuilder)requestBuilder.addHeader((CharSequence)"Content-Type", "application/json; charset=utf-8")).setBody(json.toString())).execute();
            return new BrokerResponseFuture((Future<Response>)response, query, url, this._brokerReadTimeout);
        }
        catch (Exception e) {
            throw new PinotClientException(e);
        }
    }

    @Override
    public BrokerResponse executeQuery(String brokerAddress, Request request) throws PinotClientException {
        try {
            return this.executeQueryAsync(brokerAddress, request).get(this._brokerReadTimeout, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            throw new PinotClientException(e);
        }
    }

    @Override
    public Future<BrokerResponse> executeQueryAsync(String brokerAddress, Request request) throws PinotClientException {
        return this.executeQueryAsync(brokerAddress, request.getQuery());
    }

    @Override
    public void close() throws PinotClientException {
        if (this._httpClient.isClosed()) {
            throw new PinotClientException("Connection is already closed!");
        }
        try {
            this._httpClient.close();
        }
        catch (IOException exception) {
            throw new PinotClientException("Error while closing connection!");
        }
    }

    private static class BrokerResponseFuture
    implements Future<BrokerResponse> {
        private final Future<Response> _response;
        private final String _query;
        private final String _url;
        private final long _brokerReadTimeout;

        public BrokerResponseFuture(Future<Response> response, String query, String url, long brokerReadTimeout) {
            this._response = response;
            this._query = query;
            this._url = url;
            this._brokerReadTimeout = brokerReadTimeout;
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return this._response.cancel(mayInterruptIfRunning);
        }

        @Override
        public boolean isCancelled() {
            return this._response.isCancelled();
        }

        @Override
        public boolean isDone() {
            return this._response.isDone();
        }

        @Override
        public BrokerResponse get() throws ExecutionException {
            return this.get(this._brokerReadTimeout, TimeUnit.MILLISECONDS);
        }

        @Override
        public BrokerResponse get(long timeout, TimeUnit unit) throws ExecutionException {
            try {
                LOGGER.debug("Sending query {} to {}", (Object)this._query, (Object)this._url);
                Response httpResponse = this._response.get(timeout, unit);
                LOGGER.debug("Completed query, HTTP status is {}", (Object)httpResponse.getStatusCode());
                if (httpResponse.getStatusCode() != 200) {
                    throw new PinotClientException("Pinot returned HTTP status " + httpResponse.getStatusCode() + ", expected 200");
                }
                String responseBody = httpResponse.getResponseBody(StandardCharsets.UTF_8);
                return BrokerResponse.fromJson(OBJECT_READER.readTree(responseBody));
            }
            catch (Exception e) {
                throw new ExecutionException(e);
            }
        }
    }
}

