/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.server.event;

import com.hazelcast.ringbuffer.OverflowPolicy;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.squareup.okhttp.MediaType;
import com.squareup.okhttp.OkHttpClient;
import com.squareup.okhttp.Request;
import com.squareup.okhttp.RequestBody;
import com.squareup.okhttp.Response;
import com.squareup.okhttp.ResponseBody;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.api.event.EventHandler;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobEventHttpReportHandler
implements EventHandler {
    private static final Logger log = LoggerFactory.getLogger(JobEventHttpReportHandler.class);
    public static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    public static final Duration REPORT_INTERVAL = Duration.ofSeconds(10L);
    private final String httpEndpoint;
    private final Map<String, String> httpHeaders;
    private final OkHttpClient httpClient;
    private final MediaType httpMediaType = MediaType.parse((String)"application/json");
    private final Ringbuffer ringbuffer;
    private volatile long committedEventIndex;
    private final ScheduledExecutorService scheduledExecutorService;

    public JobEventHttpReportHandler(String httpEndpoint, Ringbuffer ringbuffer) {
        this(httpEndpoint, REPORT_INTERVAL, ringbuffer);
    }

    public JobEventHttpReportHandler(String httpEndpoint, Map<String, String> httpHeaders, Ringbuffer ringbuffer) {
        this(httpEndpoint, httpHeaders, REPORT_INTERVAL, ringbuffer);
    }

    public JobEventHttpReportHandler(String httpEndpoint, Duration reportInterval, Ringbuffer ringbuffer) {
        this(httpEndpoint, Collections.emptyMap(), reportInterval, ringbuffer);
    }

    public JobEventHttpReportHandler(String httpEndpoint, Map<String, String> httpHeaders, Duration reportInterval, Ringbuffer ringbuffer) {
        this.httpEndpoint = httpEndpoint;
        this.httpHeaders = httpHeaders;
        this.ringbuffer = ringbuffer;
        this.committedEventIndex = ringbuffer.headSequence();
        this.httpClient = this.createHttpClient();
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("http-report-event-scheduler-%d").build());
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                this.report();
            }
            catch (Throwable e) {
                log.error("Failed to report event", e);
            }
        }, 0L, reportInterval.getSeconds(), TimeUnit.SECONDS);
    }

    public void handle(Event event) {
        CompletionStage completionStage = this.ringbuffer.addAsync((Object)event, OverflowPolicy.OVERWRITE);
        completionStage.toCompletableFuture().join();
    }

    @VisibleForTesting
    synchronized void report() throws IOException {
        CompletionStage completionStage;
        ReadResultSet resultSet;
        long headSequence = this.ringbuffer.headSequence();
        if (headSequence > this.committedEventIndex) {
            log.warn("The head sequence {} is greater than the committed event index {}", (Object)headSequence, (Object)this.committedEventIndex);
            this.committedEventIndex = headSequence;
        }
        if ((resultSet = (ReadResultSet)(completionStage = this.ringbuffer.readManyAsync(this.committedEventIndex, 0, 1000, null)).toCompletableFuture().join()).size() <= 0) {
            return;
        }
        String events = JSON_MAPPER.writeValueAsString((Object)resultSet.iterator());
        Request.Builder requestBuilder = new Request.Builder().url(this.httpEndpoint).post(RequestBody.create((MediaType)this.httpMediaType, (String)events));
        this.httpHeaders.forEach((arg_0, arg_1) -> ((Request.Builder)requestBuilder).header(arg_0, arg_1));
        Response response = this.httpClient.newCall(requestBuilder.build()).execute();
        try (ResponseBody closeable = response.body();){
            if (response.isSuccessful()) {
                this.committedEventIndex += (long)resultSet.readCount();
            } else {
                log.error("Failed to request http server: {}", (Object)response);
            }
        }
    }

    public void close() {
        log.info("Close http report handler");
        this.scheduledExecutorService.shutdown();
    }

    private OkHttpClient createHttpClient() {
        OkHttpClient client = new OkHttpClient();
        client.setConnectTimeout(30L, TimeUnit.SECONDS);
        client.setWriteTimeout(10L, TimeUnit.SECONDS);
        return client;
    }
}

