/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.connect.channel;

import java.util.Collection;
import java.util.Comparator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.connect.Committer;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.channel.Coordinator;
import org.apache.iceberg.connect.channel.CoordinatorThread;
import org.apache.iceberg.connect.channel.KafkaClientFactory;
import org.apache.iceberg.connect.channel.KafkaUtils;
import org.apache.iceberg.connect.channel.NotRunningException;
import org.apache.iceberg.connect.channel.Worker;
import org.apache.iceberg.connect.data.SinkWriter;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitterImpl
implements Committer {
    private static final Logger LOG = LoggerFactory.getLogger(CommitterImpl.class);
    private CoordinatorThread coordinatorThread;
    private Worker worker;
    private Catalog catalog;
    private IcebergSinkConfig config;
    private SinkTaskContext context;
    private KafkaClientFactory clientFactory;
    private Collection<MemberDescription> membersWhenWorkerIsCoordinator;
    private final AtomicBoolean isInitialized = new AtomicBoolean(false);

    private void initialize(Catalog icebergCatalog, IcebergSinkConfig icebergSinkConfig, SinkTaskContext sinkTaskContext) {
        if (this.isInitialized.compareAndSet(false, true)) {
            this.catalog = icebergCatalog;
            this.config = icebergSinkConfig;
            this.context = sinkTaskContext;
            this.clientFactory = new KafkaClientFactory(this.config.kafkaProps());
        }
    }

    private boolean hasLeaderPartition(Collection<TopicPartition> currentAssignedPartitions) {
        Collection members;
        ConsumerGroupDescription groupDesc;
        try (Admin admin = this.clientFactory.createAdmin();){
            groupDesc = KafkaUtils.consumerGroupDescription(this.config.connectGroupId(), admin);
        }
        if (groupDesc.state() == ConsumerGroupState.STABLE && this.containsFirstPartition(members = groupDesc.members(), currentAssignedPartitions)) {
            this.membersWhenWorkerIsCoordinator = members;
            return true;
        }
        return false;
    }

    @VisibleForTesting
    boolean containsFirstPartition(Collection<MemberDescription> members, Collection<TopicPartition> partitions) {
        TopicPartition firstTopicPartition = members.stream().flatMap(member -> member.assignment().topicPartitions().stream()).min(new TopicPartitionComparator()).orElseThrow(() -> new ConnectException("No partitions assigned, cannot determine leader"));
        return partitions.contains(firstTopicPartition);
    }

    @Override
    public void start(Catalog icebergCatalog, IcebergSinkConfig icebergSinkConfig, SinkTaskContext sinkTaskContext) {
        throw new UnsupportedOperationException("The method start(Catalog, IcebergSinkConfig, SinkTaskContext) is deprecated and will be removed in 2.0.0. Use start(Catalog, IcebergSinkConfig, SinkTaskContext, Collection<TopicPartition>) instead.");
    }

    @Override
    public void open(Catalog icebergCatalog, IcebergSinkConfig icebergSinkConfig, SinkTaskContext sinkTaskContext, Collection<TopicPartition> addedPartitions) {
        this.initialize(icebergCatalog, icebergSinkConfig, sinkTaskContext);
        if (this.hasLeaderPartition(addedPartitions)) {
            LOG.info("Committer received leader partition. Starting Coordinator.");
            this.startCoordinator();
        }
    }

    @Override
    public void stop() {
        throw new UnsupportedOperationException("The method stop() is deprecated and will be removed in 2.0.0. Use stop(Collection<TopicPartition>) instead.");
    }

    @Override
    public void close(Collection<TopicPartition> closedPartitions) {
        if (this.hasLeaderPartition(closedPartitions)) {
            LOG.info("Committer lost leader partition. Stopping Coordinator.");
            this.stopCoordinator();
        }
        this.stopWorker();
        KafkaUtils.seekToLastCommittedOffsets(this.context);
    }

    @Override
    public void save(Collection<SinkRecord> sinkRecords) {
        if (sinkRecords != null && !sinkRecords.isEmpty()) {
            this.startWorker();
            this.worker.save(sinkRecords);
        }
        this.processControlEvents();
    }

    private void processControlEvents() {
        if (this.coordinatorThread != null && this.coordinatorThread.isTerminated()) {
            throw new NotRunningException("Coordinator unexpectedly terminated");
        }
        if (this.worker != null) {
            this.worker.process();
        }
    }

    private void startWorker() {
        if (null == this.worker) {
            LOG.info("Starting commit worker");
            SinkWriter sinkWriter = new SinkWriter(this.catalog, this.config);
            this.worker = new Worker(this.config, this.clientFactory, sinkWriter, this.context);
            this.worker.start();
        }
    }

    private void startCoordinator() {
        if (null == this.coordinatorThread) {
            LOG.info("Task elected leader, starting commit coordinator");
            Coordinator coordinator = new Coordinator(this.catalog, this.config, this.membersWhenWorkerIsCoordinator, this.clientFactory, this.context);
            this.coordinatorThread = new CoordinatorThread(coordinator);
            this.coordinatorThread.start();
        }
    }

    private void stopWorker() {
        if (this.worker != null) {
            this.worker.stop();
            this.worker = null;
        }
    }

    private void stopCoordinator() {
        if (this.coordinatorThread != null) {
            this.coordinatorThread.terminate();
            this.coordinatorThread = null;
        }
    }

    static class TopicPartitionComparator
    implements Comparator<TopicPartition> {
        TopicPartitionComparator() {
        }

        @Override
        public int compare(TopicPartition o1, TopicPartition o2) {
            int result = o1.topic().compareTo(o2.topic());
            if (result == 0) {
                result = Integer.compare(o1.partition(), o2.partition());
            }
            return result;
        }
    }
}

