/*
 * Decompiled with CFR 0.152.
 */
package com.getindata.connectors.http.internal.sink.httpclient;

import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.AbstractRequestSubmitter;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
import com.getindata.connectors.http.internal.sink.httpclient.JavaNetHttpResponseWrapper;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import lombok.Generated;
import org.apache.flink.annotation.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchRequestSubmitter
extends AbstractRequestSubmitter {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(BatchRequestSubmitter.class);
    private static final byte[] BATCH_START_BYTES = "[".getBytes(StandardCharsets.UTF_8);
    private static final byte[] BATCH_END_BYTES = "]".getBytes(StandardCharsets.UTF_8);
    private static final byte[] BATCH_ELEMENT_DELIM_BYTES = ",".getBytes(StandardCharsets.UTF_8);
    private final int httpRequestBatchSize;

    public BatchRequestSubmitter(Properties properties, String[] headersAndValue, HttpClient httpClient) {
        super(properties, headersAndValue, httpClient);
        this.httpRequestBatchSize = Integer.parseInt(properties.getProperty("gid.connector.http.sink.request.batch.size"));
    }

    @Override
    public List<CompletableFuture<JavaNetHttpResponseWrapper>> submit(String endpointUrl, List<HttpSinkRequestEntry> requestsToSubmit) {
        if (requestsToSubmit.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<CompletableFuture<JavaNetHttpResponseWrapper>> responseFutures = new ArrayList<CompletableFuture<JavaNetHttpResponseWrapper>>();
        String previousReqeustMethod = requestsToSubmit.get((int)0).method;
        ArrayList<HttpSinkRequestEntry> requestBatch = new ArrayList<HttpSinkRequestEntry>(this.httpRequestBatchSize);
        for (HttpSinkRequestEntry entry : requestsToSubmit) {
            if (requestBatch.size() == this.httpRequestBatchSize || !previousReqeustMethod.equalsIgnoreCase(entry.method)) {
                responseFutures.add(this.sendBatch(endpointUrl, requestBatch));
                requestBatch.clear();
            }
            requestBatch.add(entry);
            previousReqeustMethod = entry.method;
        }
        responseFutures.add(this.sendBatch(endpointUrl, requestBatch));
        return responseFutures;
    }

    @VisibleForTesting
    int getBatchSize() {
        return this.httpRequestBatchSize;
    }

    private CompletableFuture<JavaNetHttpResponseWrapper> sendBatch(String endpointUrl, List<HttpSinkRequestEntry> reqeustBatch) {
        HttpRequest httpRequest = this.buildHttpRequest(reqeustBatch, URI.create(endpointUrl));
        return ((CompletableFuture)this.httpClient.sendAsync(httpRequest.getHttpRequest(), HttpResponse.BodyHandlers.ofString()).exceptionally(ex -> {
            log.error("Request fatally failed because of an exception", ex);
            return null;
        })).thenApplyAsync(res -> new JavaNetHttpResponseWrapper(httpRequest, (HttpResponse<String>)res), (Executor)this.publishingThreadPool);
    }

    private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry> reqeustBatch, URI endpointUri) {
        try {
            String method = reqeustBatch.get((int)0).method;
            ArrayList<byte[]> elements = new ArrayList<byte[]>(reqeustBatch.size());
            elements.add(BATCH_START_BYTES);
            for (HttpSinkRequestEntry entry : reqeustBatch) {
                elements.add(entry.element);
                elements.add(BATCH_ELEMENT_DELIM_BYTES);
            }
            elements.set(elements.size() - 1, BATCH_END_BYTES);
            HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofByteArrays(elements);
            HttpRequest.Builder requestBuilder = java.net.http.HttpRequest.newBuilder().uri(endpointUri).version(HttpClient.Version.HTTP_1_1).timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds)).method(method, publisher);
            if (this.headersAndValues.length != 0) {
                requestBuilder.headers(this.headersAndValues);
            }
            return new HttpRequest(requestBuilder.build(), elements, method);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

