/*
 * Decompiled with CFR 0.152.
 */
package com.azure.monitor.ingestion;

import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.exception.HttpResponseException;
import com.azure.core.http.HttpHeader;
import com.azure.core.http.HttpHeaderName;
import com.azure.core.http.rest.RequestOptions;
import com.azure.core.http.rest.Response;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.monitor.ingestion.LogsIngestionClientBuilder;
import com.azure.monitor.ingestion.implementation.Batcher;
import com.azure.monitor.ingestion.implementation.LogsIngestionClientImpl;
import com.azure.monitor.ingestion.implementation.LogsIngestionRequest;
import com.azure.monitor.ingestion.implementation.UploadLogsResponseHolder;
import com.azure.monitor.ingestion.implementation.Utils;
import com.azure.monitor.ingestion.models.LogsUploadError;
import com.azure.monitor.ingestion.models.LogsUploadException;
import com.azure.monitor.ingestion.models.LogsUploadOptions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

@ServiceClient(isAsync=true, builder=LogsIngestionClientBuilder.class)
public final class LogsIngestionAsyncClient {
    private final LogsIngestionClientImpl service;

    LogsIngestionAsyncClient(LogsIngestionClientImpl service) {
        this.service = service;
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Void> upload(String ruleId, String streamName, Iterable<Object> logs) {
        return this.upload(ruleId, streamName, logs, new LogsUploadOptions());
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Void> upload(String ruleId, String streamName, Iterable<Object> logs, LogsUploadOptions options) {
        return FluxUtil.withContext(context -> this.upload(ruleId, streamName, logs, options, (Context)context));
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<Void>> uploadWithResponse(String ruleId, String streamName, BinaryData logs, RequestOptions requestOptions) {
        Objects.requireNonNull(ruleId, "'ruleId' cannot be null.");
        Objects.requireNonNull(streamName, "'streamName' cannot be null.");
        Objects.requireNonNull(logs, "'logs' cannot be null.");
        if (requestOptions == null) {
            requestOptions = new RequestOptions();
        }
        requestOptions.addRequestCallback(request -> {
            HttpHeader httpHeader = request.getHeaders().get(HttpHeaderName.CONTENT_ENCODING);
            if (httpHeader == null) {
                BinaryData gzippedRequest = BinaryData.fromBytes((byte[])Utils.gzipRequest(logs.toBytes()));
                request.setBody(gzippedRequest);
                request.setHeader(HttpHeaderName.CONTENT_ENCODING, "gzip");
            }
        });
        return this.service.uploadWithResponseAsync(ruleId, streamName, logs, requestOptions);
    }

    Mono<Void> upload(String ruleId, String streamName, Iterable<Object> logs, LogsUploadOptions options, Context context) {
        return Mono.defer(() -> this.splitAndUpload(ruleId, streamName, logs, options, context));
    }

    private Mono<Void> splitAndUpload(String ruleId, String streamName, Iterable<Object> logs, LogsUploadOptions options, Context context) {
        int concurrency = Utils.getConcurrency(options);
        return new Batcher(options, logs).toFlux().flatMapSequential(request -> this.uploadToService(ruleId, streamName, context, (LogsIngestionRequest)request), concurrency).handle((responseHolder, sink) -> this.processResponse(options, (UploadLogsResponseHolder)responseHolder, (SynchronousSink<LogsUploadException>)sink)).collectList().handle(this::processExceptions);
    }

    private void processExceptions(List<LogsUploadException> result, SynchronousSink<Void> sink) {
        long failedLogsCount = 0L;
        ArrayList<HttpResponseException> exceptions = new ArrayList<HttpResponseException>();
        for (LogsUploadException exception : result) {
            exceptions.addAll(exception.getLogsUploadErrors());
            failedLogsCount += exception.getFailedLogsCount();
        }
        if (!exceptions.isEmpty()) {
            sink.error((Throwable)((Object)new LogsUploadException(exceptions, failedLogsCount)));
        } else {
            sink.complete();
        }
    }

    private void processResponse(LogsUploadOptions options, UploadLogsResponseHolder responseHolder, SynchronousSink<LogsUploadException> sink) {
        if (responseHolder.getException() != null) {
            Consumer<LogsUploadError> uploadLogsErrorConsumer = null;
            if (options != null) {
                uploadLogsErrorConsumer = options.getLogsUploadErrorConsumer();
            }
            if (uploadLogsErrorConsumer != null) {
                uploadLogsErrorConsumer.accept(new LogsUploadError(responseHolder.getException(), responseHolder.getRequest().getLogs()));
                return;
            }
            sink.next((Object)new LogsUploadException(Collections.singletonList(responseHolder.getException()), responseHolder.getRequest().getLogs().size()));
        }
    }

    private Mono<UploadLogsResponseHolder> uploadToService(String ruleId, String streamName, Context context, LogsIngestionRequest request) {
        RequestOptions requestOptions = new RequestOptions().addHeader(HttpHeaderName.CONTENT_ENCODING, "gzip").setContext(context);
        return this.service.uploadWithResponseAsync(ruleId, streamName, BinaryData.fromBytes((byte[])request.getRequestBody()), requestOptions).map(response -> new UploadLogsResponseHolder(null, null)).onErrorResume(HttpResponseException.class, ex -> Mono.fromSupplier(() -> new UploadLogsResponseHolder(request, (HttpResponseException)ex)));
    }
}

