/*
 * Decompiled with CFR 0.152.
 */
package com.getindata.connectors.http.internal.sink.httpclient;

import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.SinkHttpClient;
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
import com.getindata.connectors.http.internal.sink.httpclient.JavaNetHttpResponseWrapper;
import com.getindata.connectors.http.internal.sink.httpclient.RequestSubmitter;
import com.getindata.connectors.http.internal.sink.httpclient.RequestSubmitterFactory;
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker;
import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker;
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.flink.annotation.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JavaNetSinkHttpClient
implements SinkHttpClient {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(JavaNetSinkHttpClient.class);
    private final String[] headersAndValues;
    private final Map<String, String> headerMap;
    private final HttpStatusCodeChecker statusCodeChecker;
    private final HttpPostRequestCallback<HttpRequest> httpPostRequestCallback;
    private final RequestSubmitter requestSubmitter;

    public JavaNetSinkHttpClient(Properties properties, HttpPostRequestCallback<HttpRequest> httpPostRequestCallback, HeaderPreprocessor headerPreprocessor, RequestSubmitterFactory requestSubmitterFactory) {
        this.httpPostRequestCallback = httpPostRequestCallback;
        this.headerMap = HttpHeaderUtils.prepareHeaderMap("gid.connector.http.sink.header.", properties, headerPreprocessor);
        ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig checkerConfig = ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig.builder().properties(properties).whiteListPrefix("gid.connector.http.sink.error.code.exclude").errorCodePrefix("gid.connector.http.sink.error.code").build();
        this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig);
        this.headersAndValues = HttpHeaderUtils.toHeaderAndValueArray(this.headerMap);
        this.requestSubmitter = requestSubmitterFactory.createSubmitter(properties, this.headersAndValues);
    }

    @Override
    public CompletableFuture<SinkHttpClientResponse> putRequests(List<HttpSinkRequestEntry> requestEntries, String endpointUrl) {
        return this.submitRequests(requestEntries, endpointUrl).thenApply(responses -> this.prepareSinkHttpClientResponse((List<JavaNetHttpResponseWrapper>)responses, endpointUrl));
    }

    private CompletableFuture<List<JavaNetHttpResponseWrapper>> submitRequests(List<HttpSinkRequestEntry> requestEntries, String endpointUrl) {
        List<CompletableFuture<JavaNetHttpResponseWrapper>> responseFutures = this.requestSubmitter.submit(endpointUrl, requestEntries);
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(responseFutures.toArray(new CompletableFuture[0]));
        return allFutures.thenApply(_void -> responseFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    }

    private SinkHttpClientResponse prepareSinkHttpClientResponse(List<JavaNetHttpResponseWrapper> responses, String endpointUrl) {
        ArrayList<HttpRequest> successfulResponses = new ArrayList<HttpRequest>();
        ArrayList<HttpRequest> failedResponses = new ArrayList<HttpRequest>();
        for (JavaNetHttpResponseWrapper response : responses) {
            HttpRequest sinkRequestEntry = response.getHttpRequest();
            Optional<HttpResponse<String>> optResponse = response.getResponse();
            this.httpPostRequestCallback.call(optResponse.orElse(null), sinkRequestEntry, endpointUrl, this.headerMap);
            if (optResponse.isEmpty() || this.statusCodeChecker.isErrorCode(optResponse.get().statusCode())) {
                failedResponses.add(sinkRequestEntry);
                continue;
            }
            successfulResponses.add(sinkRequestEntry);
        }
        return new SinkHttpClientResponse(successfulResponses, failedResponses);
    }

    @VisibleForTesting
    String[] getHeadersAndValues() {
        return Arrays.copyOf(this.headersAndValues, this.headersAndValues.length);
    }
}

