/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.connect.channel;

import java.util.Collection;
import java.util.Comparator;
import java.util.Set;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.connect.Committer;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.channel.Coordinator;
import org.apache.iceberg.connect.channel.CoordinatorThread;
import org.apache.iceberg.connect.channel.KafkaClientFactory;
import org.apache.iceberg.connect.channel.KafkaUtils;
import org.apache.iceberg.connect.channel.NotRunningException;
import org.apache.iceberg.connect.channel.Worker;
import org.apache.iceberg.connect.data.SinkWriter;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitterImpl
implements Committer {
    private static final Logger LOG = LoggerFactory.getLogger(CommitterImpl.class);
    private CoordinatorThread coordinatorThread;
    private Worker worker;

    @Override
    public void start(Catalog catalog, IcebergSinkConfig config, SinkTaskContext context) {
        Set partitions;
        Collection members;
        ConsumerGroupDescription groupDesc;
        KafkaClientFactory clientFactory = new KafkaClientFactory(config.kafkaProps());
        try (Admin admin = clientFactory.createAdmin();){
            groupDesc = KafkaUtils.consumerGroupDescription(config.connectGroupId(), admin);
        }
        if (groupDesc.state() == ConsumerGroupState.STABLE && this.isLeader(members = groupDesc.members(), partitions = context.assignment())) {
            LOG.info("Task elected leader, starting commit coordinator");
            Coordinator coordinator = new Coordinator(catalog, config, members, clientFactory, context);
            this.coordinatorThread = new CoordinatorThread(coordinator);
            this.coordinatorThread.start();
        }
        LOG.info("Starting commit worker");
        SinkWriter sinkWriter = new SinkWriter(catalog, config);
        this.worker = new Worker(config, clientFactory, sinkWriter, context);
        this.worker.start();
    }

    @Override
    public void save(Collection<SinkRecord> sinkRecords) {
        if (sinkRecords != null && !sinkRecords.isEmpty()) {
            this.worker.save(sinkRecords);
        }
        this.processControlEvents();
    }

    @Override
    public void stop() {
        if (this.worker != null) {
            this.worker.stop();
            this.worker = null;
        }
        if (this.coordinatorThread != null) {
            this.coordinatorThread.terminate();
            this.coordinatorThread = null;
        }
    }

    @VisibleForTesting
    boolean isLeader(Collection<MemberDescription> members, Collection<TopicPartition> partitions) {
        TopicPartition firstTopicPartition = members.stream().flatMap(member -> member.assignment().topicPartitions().stream()).min(new TopicPartitionComparator()).orElseThrow(() -> new ConnectException("No partitions assigned, cannot determine leader"));
        return partitions.contains(firstTopicPartition);
    }

    private void processControlEvents() {
        if (this.coordinatorThread != null && this.coordinatorThread.isTerminated()) {
            throw new NotRunningException("Coordinator unexpectedly terminated");
        }
        if (this.worker != null) {
            this.worker.process();
        }
    }

    static class TopicPartitionComparator
    implements Comparator<TopicPartition> {
        TopicPartitionComparator() {
        }

        @Override
        public int compare(TopicPartition o1, TopicPartition o2) {
            int result = o1.topic().compareTo(o2.topic());
            if (result == 0) {
                result = Integer.compare(o1.partition(), o2.partition());
            }
            return result;
        }
    }
}

