/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.functionNamespace.rest;

import com.facebook.airlift.concurrent.MoreFutures;
import com.facebook.airlift.http.client.BodyGenerator;
import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.Response;
import com.facebook.airlift.http.client.ResponseHandler;
import com.facebook.airlift.http.client.StaticBodyGenerator;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.function.SqlFunctionResult;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.functionNamespace.ForRestServer;
import com.facebook.presto.functionNamespace.rest.RestBasedFunctionNamespaceManagerConfig;
import com.facebook.presto.functionNamespace.rest.RestErrorCode;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.function.FunctionImplementationType;
import com.facebook.presto.spi.function.RemoteScalarFunctionImplementation;
import com.facebook.presto.spi.function.SqlFunctionExecutor;
import com.facebook.presto.spi.function.SqlFunctionHandle;
import com.facebook.presto.spi.function.SqlFunctionId;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.page.PagesSerdeUtil;
import com.facebook.presto.spi.page.SerializedPage;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.InputStreamSliceInput;
import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.inject.Inject;

public class RestSqlFunctionExecutor
implements SqlFunctionExecutor {
    private BlockEncodingSerde blockEncodingSerde;
    private static PagesSerde pageSerde;
    private HttpClient httpClient;
    private final RestBasedFunctionNamespaceManagerConfig restBasedFunctionNamespaceManagerConfig;
    public static final String PRESTO_PAGES = "application/X-presto-pages";

    @Inject
    public RestSqlFunctionExecutor(RestBasedFunctionNamespaceManagerConfig restBasedFunctionNamespaceManagerConfig, @ForRestServer HttpClient httpClient) {
        this.restBasedFunctionNamespaceManagerConfig = Objects.requireNonNull(restBasedFunctionNamespaceManagerConfig, "restBasedFunctionNamespaceManagerConfig is null");
        this.httpClient = Objects.requireNonNull(httpClient, "httpClient is null");
    }

    public FunctionImplementationType getImplementationType() {
        return FunctionImplementationType.REST;
    }

    public void setBlockEncodingSerde(BlockEncodingSerde blockEncodingSerde) {
        Preconditions.checkState((this.blockEncodingSerde == null ? 1 : 0) != 0, (Object)"blockEncodingSerde already set");
        this.blockEncodingSerde = Objects.requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
        pageSerde = new PagesSerde(blockEncodingSerde, Optional.empty(), Optional.empty(), Optional.empty());
    }

    public CompletableFuture<SqlFunctionResult> executeFunction(String source, RemoteScalarFunctionImplementation functionImplementation, Page input, List<Integer> channels, List<Type> argumentTypes, Type returnType) {
        SqlFunctionHandle functionHandle = functionImplementation.getFunctionHandle();
        SqlFunctionId functionId = functionHandle.getFunctionId();
        String functionVersion = functionHandle.getVersion();
        DynamicSliceOutput sliceOutput = new DynamicSliceOutput((int)input.getRetainedSizeInBytes());
        PagesSerdeUtil.writeSerializedPage((SliceOutput)sliceOutput, (SerializedPage)pageSerde.serialize(input));
        try {
            Request request = Request.Builder.preparePost().setUri(this.getExecutionEndpoint(functionId, functionVersion)).setBodyGenerator((BodyGenerator)StaticBodyGenerator.createStaticBodyGenerator((byte[])sliceOutput.slice().byteArray())).setHeader("Content-Type", PRESTO_PAGES).setHeader("Accept", PRESTO_PAGES).build();
            HttpClient.HttpResponseFuture future = this.httpClient.executeAsync(request, (ResponseHandler)new SqlFunctionResultResponseHandler());
            Futures.addCallback((ListenableFuture)future, (FutureCallback)new SqlResultFutureCallback(), (Executor)MoreExecutors.directExecutor());
            return MoreFutures.toCompletableFuture((ListenableFuture)future);
        }
        catch (Exception e) {
            return MoreFutures.failedFuture((Throwable)e);
        }
    }

    private URI getExecutionEndpoint(SqlFunctionId functionId, String functionVersion) {
        String encodedFunctionId;
        try {
            encodedFunctionId = URLEncoder.encode(functionId.toJsonString(), StandardCharsets.UTF_8.toString());
        }
        catch (UnsupportedEncodingException e) {
            throw new IllegalStateException("UTF-8 encoding is not supported", e);
        }
        HttpUriBuilder uri = HttpUriBuilder.uriBuilderFrom((URI)URI.create(this.restBasedFunctionNamespaceManagerConfig.getRestUrl())).appendPath(String.format("/v1/functions/%s/%s/%s/%s", functionId.getFunctionName().getSchemaName(), functionId.getFunctionName().getObjectName(), encodedFunctionId, functionVersion));
        return uri.build();
    }

    public static class SqlFunctionResultResponseHandler
    implements ResponseHandler<SqlFunctionResult, RuntimeException> {
        public SqlFunctionResult handleException(Request request, Exception exception) {
            if (exception instanceof SocketTimeoutException) {
                throw new PrestoException((ErrorCodeSupplier)RestErrorCode.REST_SERVER_TIMEOUT, "Request to REST server timed out. Request: " + request, (Throwable)exception);
            }
            if (exception instanceof ConnectException) {
                throw new PrestoException((ErrorCodeSupplier)RestErrorCode.REST_SERVER_CONNECT_ERROR, "Failed to connect to REST server. Request: " + request, (Throwable)exception);
            }
            throw new PrestoException((ErrorCodeSupplier)RestErrorCode.REST_SERVER_ERROR, "Unexpected error during REST call. Request: " + request + ", Exception: " + exception.getMessage(), (Throwable)exception);
        }

        public SqlFunctionResult handle(Request request, Response response) {
            if (response.getStatusCode() == 404) {
                throw new PrestoException((ErrorCodeSupplier)RestErrorCode.REST_SERVER_NOT_FOUND, "Resource not found on REST server. Request: " + request);
            }
            if (response.getStatusCode() == 500) {
                throw new PrestoException((ErrorCodeSupplier)RestErrorCode.REST_SERVER_ERROR, "Internal server error on REST server. Request: " + request);
            }
            if (response.getStatusCode() != 200) {
                throw new PrestoException((ErrorCodeSupplier)RestErrorCode.REST_SERVER_BAD_RESPONSE, "Unexpected response code: " + response.getStatusCode() + ". Request: " + request);
            }
            try {
                InputStreamSliceInput input = new InputStreamSliceInput(response.getInputStream());
                SerializedPage serializedPage = PagesSerdeUtil.readSerializedPage((SliceInput)input);
                Page page = pageSerde.deserialize(serializedPage);
                Preconditions.checkArgument((page.getChannelCount() == 1 ? 1 : 0) != 0, (Object)"Expected only one channel in the function output");
                SqlFunctionResult output = new SqlFunctionResult(page.getBlock(0), 1L);
                return output;
            }
            catch (IOException e) {
                throw new PrestoException((ErrorCodeSupplier)RestErrorCode.REST_SERVER_IO_ERROR, "Error deserializing REST server response: " + e.getMessage(), (Throwable)e);
            }
        }
    }

    public static class SqlResultFutureCallback
    implements FutureCallback<SqlFunctionResult> {
        public void onSuccess(SqlFunctionResult result) {
            result.getResult();
        }

        public void onFailure(Throwable t) {
            if (t instanceof PrestoException) {
                throw (PrestoException)t;
            }
            if (t instanceof SocketTimeoutException) {
                throw new PrestoException((ErrorCodeSupplier)RestErrorCode.REST_SERVER_TIMEOUT, "REST server execution timed out. Error: " + t.getMessage(), t);
            }
            throw new PrestoException((ErrorCodeSupplier)RestErrorCode.REST_SERVER_ERROR, "Unknown error during REST execution. Error: " + t.getMessage(), t);
        }
    }
}

