/*
 * Decompiled with CFR 0.152.
 */
package com.getindata.connectors.http.internal.sink;

import com.getindata.connectors.http.internal.SinkHttpClient;
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.utils.ThreadUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import lombok.Generated;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpSinkWriter<InputT>
extends AsyncSinkWriter<InputT, HttpSinkRequestEntry> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(HttpSinkWriter.class);
    private static final String HTTP_SINK_WRITER_THREAD_POOL_SIZE = "4";
    private final ExecutorService sinkWriterThreadPool;
    private final String endpointUrl;
    private final SinkHttpClient sinkHttpClient;
    private final Counter numRecordsSendErrorsCounter;

    public HttpSinkWriter(ElementConverter<InputT, HttpSinkRequestEntry> elementConverter, Sink.InitContext context, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, String endpointUrl, SinkHttpClient sinkHttpClient, Collection<BufferedRequestState<HttpSinkRequestEntry>> bufferedRequestStates, Properties properties) {
        super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates);
        this.endpointUrl = endpointUrl;
        this.sinkHttpClient = sinkHttpClient;
        SinkWriterMetricGroup metrics = context.metricGroup();
        this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter();
        int sinkWriterThreadPollSize = Integer.parseInt(properties.getProperty("gid.connector.http.sink.writer.thread-pool.size", HTTP_SINK_WRITER_THREAD_POOL_SIZE));
        this.sinkWriterThreadPool = Executors.newFixedThreadPool(sinkWriterThreadPollSize, (ThreadFactory)new ExecutorThreadFactory("http-sink-writer-worker", ThreadUtils.LOGGING_EXCEPTION_HANDLER));
    }

    protected void submitRequestEntries(List<HttpSinkRequestEntry> requestEntries, Consumer<List<HttpSinkRequestEntry>> requestResult) {
        CompletableFuture<SinkHttpClientResponse> future = this.sinkHttpClient.putRequests(requestEntries, this.endpointUrl);
        future.whenCompleteAsync((response, err) -> {
            if (err != null) {
                int failedRequestsNumber = requestEntries.size();
                log.error("Http Sink fatally failed to write all {} requests", (Object)failedRequestsNumber);
                this.numRecordsSendErrorsCounter.inc((long)failedRequestsNumber);
            } else if (response.getFailedRequests().size() > 0) {
                int failedRequestsNumber = response.getFailedRequests().size();
                log.error("Http Sink failed to write and will retry {} requests", (Object)failedRequestsNumber);
                this.numRecordsSendErrorsCounter.inc((long)failedRequestsNumber);
            }
            requestResult.accept(Collections.emptyList());
        }, (Executor)this.sinkWriterThreadPool);
    }

    protected long getSizeInBytes(HttpSinkRequestEntry s) {
        return s.getSizeInBytes();
    }

    public void close() {
        this.sinkWriterThreadPool.shutdownNow();
        super.close();
    }
}

