/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.kafka.source.fetch;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherTask;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaPartitionSplitReader;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSourceFetcherManager
extends SingleThreadFetcherManager<ConsumerRecord<byte[], byte[]>, KafkaSourceSplit> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSourceFetcherManager.class);

    public KafkaSourceFetcherManager(BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> elementsQueue, Supplier<SplitReader<ConsumerRecord<byte[], byte[]>, KafkaSourceSplit>> splitReaderSupplier) {
        super(elementsQueue, splitReaderSupplier);
    }

    public KafkaSourceFetcherManager(BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> elementsQueue, Supplier<SplitReader<ConsumerRecord<byte[], byte[]>, KafkaSourceSplit>> splitReaderSupplier, Consumer<Collection<String>> splitFinishedHook) {
        super(elementsQueue, splitReaderSupplier, splitFinishedHook);
    }

    public void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, OffsetCommitCallback callback) {
        logger.debug("Committing offsets {}", offsetsToCommit);
        if (offsetsToCommit.isEmpty()) {
            return;
        }
        SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaSourceSplit> splitFetcher = (SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaSourceSplit>)this.fetchers.get(0);
        if (splitFetcher != null) {
            this.enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback);
        } else {
            splitFetcher = this.createSplitFetcher();
            this.enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback);
            this.startFetcher(splitFetcher);
        }
    }

    private void enqueueOffsetsCommitTask(SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaSourceSplit> splitFetcher, final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, final OffsetCommitCallback callback) {
        final KafkaPartitionSplitReader kafkaReader = (KafkaPartitionSplitReader)splitFetcher.getSplitReader();
        splitFetcher.addTask(new SplitFetcherTask(){

            @Override
            public void run() throws IOException {
                kafkaReader.notifyCheckpointComplete(offsetsToCommit, callback);
            }

            @Override
            public void wakeUp() {
            }
        });
    }
}

