/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.kusto.ingest;

import com.azure.core.http.HttpClient;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobClientBuilder;
import com.azure.storage.blob.models.BlobStorageException;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.ClientRequestProperties;
import com.microsoft.azure.kusto.data.Ensure;
import com.microsoft.azure.kusto.data.StreamingClient;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.ExceptionsUtils;
import com.microsoft.azure.kusto.data.http.HttpClientProperties;
import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientBase;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.result.IngestionStatus;
import com.microsoft.azure.kusto.ingest.result.IngestionStatusResult;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import com.microsoft.azure.kusto.ingest.utils.IngestionUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Objects;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingIngestClient
extends IngestClientBase
implements IngestClient {
    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final int STREAM_COMPRESS_BUFFER_SIZE = 16384;
    private static final String CLASS_NAME = StreamingIngestClient.class.getSimpleName();
    private final StreamingClient streamingClient;
    String connectionDataSource;

    StreamingIngestClient(ConnectionStringBuilder csb, @Nullable HttpClientProperties properties, boolean autoCorrectEndpoint) throws URISyntaxException {
        log.info("Creating a new StreamingIngestClient");
        ConnectionStringBuilder csbWithEndpoint = new ConnectionStringBuilder(csb);
        csbWithEndpoint.setClusterUrl(autoCorrectEndpoint ? StreamingIngestClient.getQueryEndpoint(csbWithEndpoint.getClusterUrl()) : csbWithEndpoint.getClusterUrl());
        this.streamingClient = ClientFactory.createStreamingClient((ConnectionStringBuilder)csbWithEndpoint, (HttpClientProperties)properties);
        this.connectionDataSource = csbWithEndpoint.getClusterUrl();
    }

    StreamingIngestClient(ConnectionStringBuilder csb, @Nullable HttpClient httpClient, boolean autoCorrectEndpoint) throws URISyntaxException {
        log.info("Creating a new StreamingIngestClient");
        ConnectionStringBuilder csbWithEndpoint = new ConnectionStringBuilder(csb);
        csbWithEndpoint.setClusterUrl(autoCorrectEndpoint ? StreamingIngestClient.getQueryEndpoint(csbWithEndpoint.getClusterUrl()) : csbWithEndpoint.getClusterUrl());
        this.streamingClient = ClientFactory.createStreamingClient((ConnectionStringBuilder)csbWithEndpoint, (HttpClient)httpClient);
        this.connectionDataSource = csbWithEndpoint.getClusterUrl();
    }

    StreamingIngestClient(StreamingClient streamingClient) {
        log.info("Creating a new StreamingIngestClient");
        this.streamingClient = streamingClient;
    }

    public static String generateEngineUriSuggestion(URI existingEndpoint) throws URISyntaxException {
        if (!Objects.requireNonNull(existingEndpoint.getHost()).toLowerCase().startsWith("ingest-")) {
            throw new IllegalArgumentException("The URL is already formatted as the suggested Engine endpoint, so no suggestion can be made");
        }
        String host = existingEndpoint.getHost().substring("ingest-".length());
        URI newUri = new URI(existingEndpoint.getScheme(), host, existingEndpoint.getPath(), existingEndpoint.getQuery(), existingEndpoint.getFragment());
        return newUri.toString();
    }

    @Override
    protected IngestionResult ingestFromFileImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull((Object)fileSourceInfo, (String)"fileSourceInfo");
        Ensure.argIsNotNull((Object)ingestionProperties, (String)"ingestionProperties");
        fileSourceInfo.validate();
        ingestionProperties.validate();
        try {
            StreamSourceInfo streamSourceInfo = IngestionUtils.fileToStream(fileSourceInfo, false, ingestionProperties.getDataFormat());
            return this.ingestFromStream(streamSourceInfo, ingestionProperties);
        }
        catch (FileNotFoundException e) {
            log.error("File not found when ingesting a file.", (Throwable)e);
            throw new IngestionClientException("IO exception - check file path.", e);
        }
    }

    @Override
    protected IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull((Object)blobSourceInfo, (String)"blobSourceInfo");
        Ensure.argIsNotNull((Object)ingestionProperties, (String)"ingestionProperties");
        blobSourceInfo.validate();
        ingestionProperties.validate();
        try {
            BlobClient blobClient = new BlobClientBuilder().endpoint(blobSourceInfo.getBlobPath()).buildClient();
            return this.ingestFromBlob(blobSourceInfo, ingestionProperties, blobClient, null);
        }
        catch (IllegalArgumentException e) {
            String msg = "Unexpected error when ingesting a blob - Invalid blob path.";
            log.error(msg, (Throwable)e);
            throw new IngestionClientException(msg, e);
        }
        catch (BlobStorageException e) {
            String msg = "Unexpected Storage error when ingesting a blob.";
            log.error(msg, (Throwable)e);
            throw new IngestionClientException(msg, e);
        }
    }

    @Override
    protected IngestionResult ingestFromResultSetImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull((Object)resultSetSourceInfo, (String)"resultSetSourceInfo");
        Ensure.argIsNotNull((Object)ingestionProperties, (String)"ingestionProperties");
        resultSetSourceInfo.validate();
        ingestionProperties.validateResultSetProperties();
        try {
            StreamSourceInfo streamSourceInfo = IngestionUtils.resultSetToStream(resultSetSourceInfo);
            return this.ingestFromStream(streamSourceInfo, ingestionProperties);
        }
        catch (IOException ex) {
            String msg = "Failed to read from ResultSet.";
            log.error(msg, (Throwable)ex);
            throw new IngestionClientException(msg, ex);
        }
    }

    @Override
    protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        return this.ingestFromStreamImpl(streamSourceInfo, ingestionProperties, null);
    }

    @Override
    protected String getClientType() {
        return CLASS_NAME;
    }

    IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties, @Nullable String clientRequestId) throws IngestionClientException, IngestionServiceException {
        return (IngestionResult)MonitoredActivity.invoke(() -> this.ingestFromStreamImpl(streamSourceInfo, ingestionProperties, clientRequestId), (String)this.getClientType().concat(".ingestFromStream"), this.getIngestionTraceAttributes(streamSourceInfo, ingestionProperties));
    }

    private IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties, @Nullable String clientRequestId) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull((Object)streamSourceInfo, (String)"streamSourceInfo");
        Ensure.argIsNotNull((Object)ingestionProperties, (String)"ingestionProperties");
        IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat();
        streamSourceInfo.validate();
        ingestionProperties.validate();
        ClientRequestProperties clientRequestProperties = null;
        if (StringUtils.isNotBlank((CharSequence)clientRequestId)) {
            clientRequestProperties = new ClientRequestProperties();
            clientRequestProperties.setClientRequestId(clientRequestId);
        }
        try {
            InputStream stream = IngestClientBase.shouldCompress(streamSourceInfo.getCompressionType(), dataFormat) ? this.compressStream(streamSourceInfo.getStream(), streamSourceInfo.isLeaveOpen()) : streamSourceInfo.getStream();
            log.debug("Executing streaming ingest");
            this.streamingClient.executeStreamingIngest(ingestionProperties.getDatabaseName(), ingestionProperties.getTableName(), stream, clientRequestProperties, dataFormat.getKustoValue(), ingestionProperties.getIngestionMapping().getIngestionMappingReference(), streamSourceInfo.getCompressionType() != null && streamSourceInfo.isLeaveOpen());
        }
        catch (DataClientException | IOException e) {
            String msg = ExceptionsUtils.getMessageEx((Exception)e);
            log.error(msg, e);
            throw new IngestionClientException(msg, e);
        }
        catch (DataServiceException e) {
            log.error(e.getMessage(), (Throwable)e);
            throw new IngestionServiceException(e.getMessage(), (Exception)((Object)e));
        }
        log.debug("Stream was ingested successfully.");
        IngestionStatus ingestionStatus = new IngestionStatus();
        ingestionStatus.status = OperationStatus.Succeeded;
        ingestionStatus.table = ingestionProperties.getTableName();
        ingestionStatus.database = ingestionProperties.getDatabaseName();
        return new IngestionStatusResult(ingestionStatus);
    }

    private InputStream compressStream(InputStream uncompressedStream, boolean leaveOpen) throws IngestionClientException, IOException {
        log.debug("Compressing the stream.");
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
        byte[] b = new byte[16384];
        int read = uncompressedStream.read(b);
        if (read == -1) {
            String message = "Empty stream.";
            log.error(message);
            throw new IngestionClientException(message);
        }
        do {
            gzipOutputStream.write(b, 0, read);
        } while ((read = uncompressedStream.read(b)) != -1);
        gzipOutputStream.flush();
        gzipOutputStream.close();
        ByteArrayInputStream inputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
        byteArrayOutputStream.close();
        if (!leaveOpen) {
            uncompressedStream.close();
        }
        return inputStream;
    }

    IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties, BlobClient cloudBlockBlob, @Nullable String clientRequestId) throws IngestionClientException, IngestionServiceException {
        return (IngestionResult)MonitoredActivity.invoke(() -> this.ingestFromBlobImpl(blobSourceInfo, ingestionProperties, cloudBlockBlob, clientRequestId), (String)this.getClientType().concat(".ingestFromBlob"), this.getIngestionTraceAttributes(blobSourceInfo, ingestionProperties));
    }

    private IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties, BlobClient cloudBlockBlob, @Nullable String clientRequestId) throws IngestionClientException, IngestionServiceException {
        String blobPath = blobSourceInfo.getBlobPath();
        try {
            if (blobSourceInfo.getRawSizeInBytes() == 0L && cloudBlockBlob.getProperties().getBlobSize() == 0L) {
                String message = "Empty blob.";
                log.error(message);
                throw new IngestionClientException(message);
            }
        }
        catch (BlobStorageException ex) {
            throw new IngestionClientException(String.format("Exception trying to read blob metadata,%s", ex.getStatusCode() == 403 ? "this might mean the blob doesn't exist" : ""), ex);
        }
        ClientRequestProperties clientRequestProperties = null;
        if (StringUtils.isNotBlank((CharSequence)clientRequestId)) {
            clientRequestProperties = new ClientRequestProperties();
            clientRequestProperties.setClientRequestId(clientRequestId);
        }
        IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat();
        try {
            this.streamingClient.executeStreamingIngestFromBlob(ingestionProperties.getDatabaseName(), ingestionProperties.getTableName(), blobPath, clientRequestProperties, dataFormat.getKustoValue(), ingestionProperties.getIngestionMapping().getIngestionMappingReference());
        }
        catch (DataClientException e) {
            log.error(e.getMessage(), (Throwable)e);
            throw new IngestionClientException(e.getMessage(), e);
        }
        catch (DataServiceException e) {
            log.error(e.getMessage(), (Throwable)e);
            throw new IngestionServiceException(e.getMessage(), (Exception)((Object)e));
        }
        log.debug("Blob was ingested successfully.");
        IngestionStatus ingestionStatus = new IngestionStatus();
        ingestionStatus.status = OperationStatus.Succeeded;
        ingestionStatus.table = ingestionProperties.getTableName();
        ingestionStatus.database = ingestionProperties.getDatabaseName();
        return new IngestionStatusResult(ingestionStatus);
    }

    protected void setConnectionDataSource(String connectionDataSource) {
        this.connectionDataSource = connectionDataSource;
    }

    @Override
    public void close() {
    }
}

