/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.kusto.data;

import com.azure.core.http.HttpClient;
import com.azure.core.util.BinaryData;
import com.microsoft.azure.kusto.data.BaseClient;
import com.microsoft.azure.kusto.data.ClientDetails;
import com.microsoft.azure.kusto.data.ClientRequestProperties;
import com.microsoft.azure.kusto.data.CommandType;
import com.microsoft.azure.kusto.data.Ensure;
import com.microsoft.azure.kusto.data.IngestionSourceStorage;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.UriUtils;
import com.microsoft.azure.kusto.data.auth.CloudInfo;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.auth.TokenProviderBase;
import com.microsoft.azure.kusto.data.auth.TokenProviderFactory;
import com.microsoft.azure.kusto.data.auth.endpoints.KustoTrustedEndpoints;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.ExceptionUtils;
import com.microsoft.azure.kusto.data.exceptions.KustoServiceQueryError;
import com.microsoft.azure.kusto.data.exceptions.ParseException;
import com.microsoft.azure.kusto.data.http.HttpClientFactory;
import com.microsoft.azure.kusto.data.http.HttpClientProperties;
import com.microsoft.azure.kusto.data.http.HttpRequestBuilder;
import com.microsoft.azure.kusto.data.http.HttpTracing;
import com.microsoft.azure.kusto.data.http.UncloseableStream;
import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity;
import com.microsoft.azure.kusto.data.instrumentation.TraceableAttributes;
import com.microsoft.azure.kusto.data.req.KustoRequest;
import com.microsoft.azure.kusto.data.req.KustoRequestContext;
import com.microsoft.azure.kusto.data.res.JsonResult;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;

class ClientImpl
extends BaseClient {
    public static final String MGMT_ENDPOINT_VERSION = "v1";
    public static final String QUERY_ENDPOINT_VERSION = "v2";
    public static final String STREAMING_VERSION = "v1";
    private static final Long CLIENT_GRACE_PERIOD_IN_MILLISECS = TimeUnit.SECONDS.toMillis(30L);
    private static final Long COMMAND_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(10L);
    private static final Long QUERY_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(4L);
    private static final Long STREAMING_INGEST_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(10L);
    private final TokenProviderBase aadAuthenticationHelper;
    private final String clusterUrl;
    private final String defaultDatabaseName;
    private final ClientDetails clientDetails;
    private boolean endpointValidated = false;

    public ClientImpl(ConnectionStringBuilder csb) throws URISyntaxException {
        this(csb, HttpClientProperties.builder().build());
    }

    public ClientImpl(ConnectionStringBuilder csb, HttpClientProperties properties) throws URISyntaxException {
        this(csb, HttpClientFactory.create(properties));
    }

    public ClientImpl(ConnectionStringBuilder csb, HttpClient httpClient) throws URISyntaxException {
        super(httpClient);
        String clusterURL = UriUtils.createClusterURLFrom(csb.getClusterUrl());
        csb.setClusterUrl(clusterURL);
        this.clusterUrl = csb.getClusterUrl();
        this.aadAuthenticationHelper = this.clusterUrl.toLowerCase().startsWith("http://localhost") ? null : TokenProviderFactory.createTokenProvider(csb, httpClient);
        this.clientDetails = new ClientDetails(csb.getApplicationNameForTracing(), csb.getUserNameForTracing(), csb.getClientVersionForTracing());
        this.defaultDatabaseName = csb.getInitialCatalog();
    }

    @Override
    public KustoOperationResult executeQuery(String command) {
        return this.executeQuery(this.defaultDatabaseName, command);
    }

    @Override
    public KustoOperationResult executeQuery(String database, String command) {
        return this.executeQuery(database, command, null);
    }

    @Override
    public KustoOperationResult executeQuery(String database, String command, ClientRequestProperties properties) {
        return (KustoOperationResult)this.executeQueryAsync(database, command, properties).block();
    }

    @Override
    public Mono<KustoOperationResult> executeQueryAsync(String command) {
        return this.executeQueryAsync(this.defaultDatabaseName, command);
    }

    @Override
    public Mono<KustoOperationResult> executeQueryAsync(String database, String command) {
        return this.executeQueryAsync(database, command, null);
    }

    @Override
    public Mono<KustoOperationResult> executeQueryAsync(String database, String command, ClientRequestProperties properties) {
        return this.executeAsync(database, command, properties, CommandType.QUERY);
    }

    @Override
    public KustoOperationResult executeMgmt(String command) {
        return this.executeMgmt(this.defaultDatabaseName, command);
    }

    @Override
    public KustoOperationResult executeMgmt(String database, String command) {
        return this.executeMgmt(database, command, null);
    }

    @Override
    public KustoOperationResult executeMgmt(String database, String command, ClientRequestProperties properties) {
        return (KustoOperationResult)this.executeAsync(database, command, properties, CommandType.ADMIN_COMMAND).block();
    }

    @Override
    public Mono<KustoOperationResult> executeMgmtAsync(String command) {
        return this.executeMgmtAsync(this.defaultDatabaseName, command);
    }

    @Override
    public Mono<KustoOperationResult> executeMgmtAsync(String database, String command) {
        return this.executeMgmtAsync(this.defaultDatabaseName, command, null);
    }

    @Override
    public Mono<KustoOperationResult> executeMgmtAsync(String database, String command, ClientRequestProperties properties) {
        return this.executeAsync(database, command, properties, CommandType.ADMIN_COMMAND);
    }

    @Override
    public String executeToJsonResult(String database, String command, ClientRequestProperties properties) {
        return (String)this.executeToJsonResultAsync(database, command, properties).block();
    }

    @Override
    public Mono<String> executeToJsonResultAsync(String database, String command, ClientRequestProperties properties) {
        return Mono.defer(() -> {
            KustoRequest kr = new KustoRequest(command, database == null ? this.defaultDatabaseName : database, properties);
            return this.executeWithTimeout(kr, ".executeToJsonResultAsync");
        });
    }

    private Mono<KustoOperationResult> executeAsync(String database, String command, ClientRequestProperties properties, CommandType commandType) {
        return Mono.defer(() -> {
            KustoRequest kr = new KustoRequest(command, database, properties, commandType);
            return MonitoredActivity.wrap(this.executeImplAsync(kr), commandType.getActivityTypeSuffix().concat(".executeAsync"), this.updateAndGetExecuteTracingAttributes(database, properties));
        });
    }

    private Map<String, String> updateAndGetExecuteTracingAttributes(String database, TraceableAttributes traceableAttributes) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("cluster", this.clusterUrl);
        attributes.put("database", database);
        if (traceableAttributes != null) {
            attributes.putAll(traceableAttributes.getTracingAttributes());
        }
        return attributes;
    }

    private Mono<KustoOperationResult> executeImplAsync(KustoRequest kr) {
        String clusterEndpoint = String.format(kr.getCommandType().getEndpoint(), this.clusterUrl);
        return this.executeWithTimeout(kr, ".executeImplAsync").publishOn(Schedulers.boundedElastic()).map(response -> {
            JsonResult jsonResult = new JsonResult((String)response, clusterEndpoint);
            return new KustoOperationResult(jsonResult.getResult(), jsonResult.getEndpoint().endsWith("v2/rest/query") ? QUERY_ENDPOINT_VERSION : "v1");
        }).onErrorMap(KustoServiceQueryError.class, e -> new DataServiceException(clusterEndpoint, e.getMessage(), (Exception)((Object)e), e.isPermanent())).onErrorMap(Exception.class, e -> {
            if (e instanceof DataServiceException) {
                return e;
            }
            return new DataClientException(clusterEndpoint, ExceptionUtils.getMessageEx(e), (Exception)e);
        });
    }

    private Mono<String> executeWithTimeout(KustoRequest request, String nameOfSpan) {
        return this.prepareRequestAsync(request).zipWhen(requestContext -> {
            ClientRequestProperties properties = request.getProperties() == null ? new ClientRequestProperties() : request.getProperties();
            long timeoutMs = this.determineTimeout(properties, request.getCommandType(), this.clusterUrl);
            return MonitoredActivity.wrap(this.postAsync(requestContext.getHttpRequest(), timeoutMs), requestContext.getSdkRequest().getCommandType().getActivityTypeSuffix().concat(nameOfSpan));
        }).map(Tuple2::getT2);
    }

    Mono<KustoRequestContext> prepareRequestAsync(@NotNull KustoRequest kr) {
        kr.validateAndOptimize();
        String clusterEndpoint = String.format(kr.getCommandType().getEndpoint(), this.clusterUrl);
        HttpTracing tracing = HttpTracing.newBuilder().withProperties(kr.getProperties()).withRequestPrefix("KJC.execute").withActivitySuffix(kr.getCommandType().getActivityTypeSuffix()).withClientDetails(this.clientDetails).build();
        HttpRequestBuilder requestBuilder = HttpRequestBuilder.newPost(clusterEndpoint).createCommandPayload(kr).withTracing(tracing);
        return this.validateEndpointAsync().then(this.getAuthorizationHeaderValueAsync().doOnNext(requestBuilder::withAuthorization).thenReturn((Object)new KustoRequestContext(kr, requestBuilder.build())));
    }

    private Mono<Void> validateEndpointAsync() {
        if (this.endpointValidated) {
            return Mono.empty();
        }
        return CloudInfo.retrieveCloudInfoForClusterAsync(this.clusterUrl, this.httpClient).map(CloudInfo::getLoginEndpoint).doOnNext(loginEndpoint -> KustoTrustedEndpoints.validateTrustedEndpoint(this.clusterUrl, loginEndpoint)).doOnSuccess(ignored -> {
            this.endpointValidated = true;
        }).then();
    }

    @Override
    public KustoOperationResult executeStreamingIngest(String database, String table, InputStream stream, ClientRequestProperties properties, String streamFormat, String mappingName, boolean leaveOpen) {
        return (KustoOperationResult)this.executeStreamingIngestAsync(database, table, stream, properties, streamFormat, mappingName, leaveOpen).block();
    }

    @Override
    public Mono<KustoOperationResult> executeStreamingIngestAsync(String database, String table, InputStream stream, ClientRequestProperties properties, String streamFormat, String mappingName, boolean leaveOpen) {
        Ensure.argIsNotNull(stream, "stream");
        return Mono.defer(() -> {
            String clusterEndpoint = this.buildClusterEndpoint(database, table, streamFormat, mappingName);
            return this.executeStreamingIngestImplAsync(clusterEndpoint, stream, null, properties, leaveOpen);
        });
    }

    @Override
    public KustoOperationResult executeStreamingIngestFromBlob(String database, String table, String blobUrl, ClientRequestProperties properties, String dataFormat, String mappingName) {
        return (KustoOperationResult)this.executeStreamingIngestFromBlobAsync(database, table, blobUrl, properties, dataFormat, mappingName).block();
    }

    @Override
    public Mono<KustoOperationResult> executeStreamingIngestFromBlobAsync(String database, String table, String blobUrl, ClientRequestProperties properties, String dataFormat, String mappingName) {
        Ensure.argIsNotNull(blobUrl, "blobUrl");
        return Mono.defer(() -> {
            String clusterEndpoint = this.buildClusterEndpoint(database, table, dataFormat, mappingName).concat("&sourceKind=uri");
            return this.executeStreamingIngestImplAsync(clusterEndpoint, null, blobUrl, properties, false);
        });
    }

    private Mono<KustoOperationResult> executeStreamingIngestImplAsync(String clusterEndpoint, InputStream stream, String blobUrl, ClientRequestProperties properties, boolean leaveOpen) {
        return this.validateEndpointAsync().then(this.executeStreamingIngest(clusterEndpoint, stream, blobUrl, properties, leaveOpen));
    }

    private Mono<KustoOperationResult> executeStreamingIngest(String clusterEndpoint, InputStream stream, String blobUrl, ClientRequestProperties properties, boolean leaveOpen) {
        boolean isStreamSource = stream != null;
        HashMap<String, String> headers = new HashMap<String, String>();
        String contentEncoding = isStreamSource ? "gzip" : null;
        String contentType = isStreamSource ? "application/octet-stream" : "application/json";
        properties = properties == null ? new ClientRequestProperties() : properties;
        long timeoutMs = this.determineTimeout(properties, CommandType.STREAMING_INGEST, this.clusterUrl);
        Iterator<Map.Entry<String, Object>> iterator = properties.getOptions();
        while (iterator.hasNext()) {
            Map.Entry<String, Object> pair = iterator.next();
            headers.put(pair.getKey(), pair.getValue().toString());
        }
        BinaryData data = isStreamSource ? BinaryData.fromStream((InputStream)new UncloseableStream(stream)) : BinaryData.fromString((String)new IngestionSourceStorage(blobUrl).toString());
        HttpTracing tracing = HttpTracing.newBuilder().withProperties(properties).withRequestPrefix("KJC.executeStreamingIngest" + (isStreamSource ? "" : "FromBlob")).withActivitySuffix(CommandType.STREAMING_INGEST.getActivityTypeSuffix()).withClientDetails(this.clientDetails).build();
        HttpRequestBuilder httpRequestBuilder = HttpRequestBuilder.newPost(clusterEndpoint).withTracing(tracing).withHeaders(headers).withContentType(contentType).withContentEncoding(contentEncoding).withBody(data);
        return this.getAuthorizationHeaderValueAsync().doOnNext(httpRequestBuilder::withAuthorization).then(MonitoredActivity.wrap(this.postAsync(httpRequestBuilder.build(), timeoutMs), "ClientImpl.executeStreamingIngest").publishOn(Schedulers.boundedElastic()).map(response -> new KustoOperationResult((String)response, "v1")).onErrorMap(KustoServiceQueryError.class, e -> new DataClientException(clusterEndpoint, e.getMessage(), (Exception)((Object)e))).doFinally(signalType -> {
            if (isStreamSource && !leaveOpen) {
                try {
                    stream.close();
                }
                catch (IOException e) {
                    LOGGER.debug("executeStreamingIngest: Error while closing the stream.", (Throwable)e);
                }
            }
        }));
    }

    private String buildClusterEndpoint(String database, String table, String format, String mappingName) {
        if (StringUtils.isBlank((CharSequence)database)) {
            throw new IllegalArgumentException("Parameter database is empty.");
        }
        if (StringUtils.isBlank((CharSequence)table)) {
            throw new IllegalArgumentException("Parameter table is empty.");
        }
        if (StringUtils.isBlank((CharSequence)format)) {
            throw new IllegalArgumentException("Parameter format is empty.");
        }
        String clusterEndpoint = String.format(CommandType.STREAMING_INGEST.getEndpoint(), this.clusterUrl, database, table, format);
        if (!StringUtils.isEmpty((CharSequence)mappingName)) {
            clusterEndpoint = clusterEndpoint.concat(String.format("&mappingName=%s", mappingName));
        }
        return clusterEndpoint;
    }

    @Override
    public InputStream executeStreamingQuery(String command) {
        return this.executeStreamingQuery(this.defaultDatabaseName, command);
    }

    @Override
    public InputStream executeStreamingQuery(String database, String command) {
        return this.executeStreamingQuery(database, command, null);
    }

    @Override
    public InputStream executeStreamingQuery(String database, String command, ClientRequestProperties properties) {
        return (InputStream)this.executeStreamingQueryAsync(database, command, properties).block();
    }

    @Override
    public Mono<InputStream> executeStreamingQueryAsync(String command) {
        return this.executeStreamingQueryAsync(this.defaultDatabaseName, command);
    }

    @Override
    public Mono<InputStream> executeStreamingQueryAsync(String database, String command) {
        return this.executeStreamingQueryAsync(database, command, null);
    }

    @Override
    public Mono<InputStream> executeStreamingQueryAsync(String database, String command, ClientRequestProperties properties) {
        KustoRequest kr = new KustoRequest(command, database, properties);
        return this.executeStreamingQueryAsync(kr);
    }

    private Mono<InputStream> executeStreamingQueryAsync(@NotNull KustoRequest kr) {
        kr.validateAndOptimize();
        String clusterEndpoint = String.format(kr.getCommandType().getEndpoint(), this.clusterUrl);
        HttpTracing tracing = HttpTracing.newBuilder().withProperties(kr.getProperties()).withRequestPrefix("KJC.executeStreaming").withActivitySuffix(kr.getCommandType().getActivityTypeSuffix()).withClientDetails(this.clientDetails).build();
        return this.validateEndpointAsync().then(this.executeStreamingQuery(clusterEndpoint, kr, tracing));
    }

    private Mono<InputStream> executeStreamingQuery(String clusterEndpoint, KustoRequest kr, HttpTracing tracing) {
        HttpRequestBuilder requestBuilder = HttpRequestBuilder.newPost(clusterEndpoint).createCommandPayload(kr).withTracing(tracing);
        ClientRequestProperties properties = kr.getProperties() == null ? new ClientRequestProperties() : kr.getProperties();
        long timeoutMs = this.determineTimeout(properties, kr.getCommandType(), this.clusterUrl);
        return this.getAuthorizationHeaderValueAsync().doOnNext(requestBuilder::withAuthorization).then(MonitoredActivity.wrap(this.postToStreamingOutputAsync(requestBuilder.build(), timeoutMs, 0, kr.getRedirectCount()), "ClientImpl.executeStreamingQuery", this.updateAndGetExecuteTracingAttributes(kr.getDatabase(), properties)));
    }

    private long determineTimeout(ClientRequestProperties properties, CommandType commandType, String clusterUrl) {
        Long timeoutMs;
        Object skipBoolean = properties.getOption("norequesttimeout");
        if (skipBoolean instanceof Boolean && ((Boolean)skipBoolean).booleanValue()) {
            return Long.MAX_VALUE;
        }
        try {
            timeoutMs = properties.getTimeoutInMilliSec();
        }
        catch (ParseException e) {
            throw new DataClientException(clusterUrl, "Failed to parse timeout from ClientRequestProperties");
        }
        if (timeoutMs == null) {
            switch (commandType) {
                case ADMIN_COMMAND: {
                    timeoutMs = COMMAND_TIMEOUT_IN_MILLISECS;
                    break;
                }
                case STREAMING_INGEST: {
                    timeoutMs = STREAMING_INGEST_TIMEOUT_IN_MILLISECS;
                    break;
                }
                default: {
                    timeoutMs = QUERY_TIMEOUT_IN_MILLISECS;
                }
            }
        }
        properties.setTimeoutInMilliSec(timeoutMs);
        return timeoutMs + CLIENT_GRACE_PERIOD_IN_MILLISECS;
    }

    private Mono<String> getAuthorizationHeaderValueAsync() {
        if (this.aadAuthenticationHelper != null) {
            return this.aadAuthenticationHelper.acquireAccessToken().map(token -> String.format("Bearer %s", token));
        }
        return Mono.empty();
    }

    public String getClusterUrl() {
        return this.clusterUrl;
    }

    ClientDetails getClientDetails() {
        return this.clientDetails;
    }
}

