/*
 * Decompiled with CFR 0.152.
 */
package kafka.admin;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import kafka.log.UnifiedLog;
import kafka.server.KafkaBroker;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.metadata.BrokerState;
import org.apache.kafka.server.common.RequestLocal;
import org.apache.kafka.storage.internals.log.AppendOrigin;
import org.apache.kafka.storage.internals.log.VerificationGuard;
import org.junit.jupiter.api.Assertions;
import scala.Option;
import scala.jdk.javaapi.OptionConverters;

@ClusterTestDefaults(types={Type.KRAFT}, brokers=3, serverProperties={@ClusterConfigProperty(key="offsets.topic.num.partitions", value="1"), @ClusterConfigProperty(key="offsets.topic.replication.factor", value="1"), @ClusterConfigProperty(key="log.initial.task.delay.ms", value="100"), @ClusterConfigProperty(key="log.segment.delete.delay.ms", value="1000")})
public class DeleteTopicTest {
    private static final String DEFAULT_TOPIC = "topic";
    private final Map<Integer, List<Integer>> expectedReplicaAssignment = Map.of(0, List.of(Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2)));

    @ClusterTest
    public void testDeleteTopicWithAllAliveReplicas(ClusterInstance cluster) throws Exception {
        try (Admin admin = cluster.admin();){
            admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, this.expectedReplicaAssignment))).all().get();
            admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
            cluster.waitForTopic(DEFAULT_TOPIC, 0);
        }
    }

    @ClusterTest
    public void testResumeDeleteTopicWithRecoveredFollower(ClusterInstance cluster) throws Exception {
        try (Admin admin = cluster.admin();){
            admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, this.expectedReplicaAssignment))).all().get();
            TopicPartition topicPartition = new TopicPartition(DEFAULT_TOPIC, 0);
            int leaderId = this.waitUtilLeaderIsKnown(cluster.brokers(), topicPartition);
            KafkaBroker follower = this.findFollower(cluster.brokers().values(), leaderId);
            follower.shutdown();
            admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
            TestUtils.waitForCondition(() -> cluster.brokers().values().stream().filter(broker -> broker.config().brokerId() != follower.config().brokerId()).allMatch(b -> b.logManager().getLog(topicPartition, false).isEmpty()), (String)"Online replicas have not deleted log.");
            follower.startup();
            cluster.waitForTopic(DEFAULT_TOPIC, 0);
        }
    }

    @ClusterTest(brokers=4)
    public void testPartitionReassignmentDuringDeleteTopic(ClusterInstance cluster) throws Exception {
        try (Admin admin = cluster.admin();){
            admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, this.expectedReplicaAssignment))).all().get();
            TopicPartition topicPartition = new TopicPartition(DEFAULT_TOPIC, 0);
            Map<Integer, KafkaBroker> servers = this.findPartitionHostingBrokers(cluster.brokers());
            int leaderId = this.waitUtilLeaderIsKnown(cluster.brokers(), topicPartition);
            KafkaBroker follower = this.findFollower(servers.values(), leaderId);
            follower.shutdown();
            admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", cluster.bootstrapServers());
            try (Admin otherAdmin = Admin.create((Properties)properties);){
                this.waitUtilTopicGone(otherAdmin);
                Assertions.assertThrows(ExecutionException.class, () -> otherAdmin.alterPartitionReassignments(Map.of(topicPartition, Optional.of(new NewPartitionReassignment(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3)))))).all().get());
            }
            follower.startup();
            cluster.waitForTopic(DEFAULT_TOPIC, 0);
        }
    }

    @ClusterTest(brokers=4)
    public void testIncreasePartitionCountDuringDeleteTopic(ClusterInstance cluster) throws Exception {
        try (Admin admin = cluster.admin();){
            admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, this.expectedReplicaAssignment))).all().get();
            TopicPartition topicPartition = new TopicPartition(DEFAULT_TOPIC, 0);
            Map<Integer, KafkaBroker> partitionHostingBrokers = this.findPartitionHostingBrokers(cluster.brokers());
            this.waitForReplicaCreated(partitionHostingBrokers, topicPartition, "Replicas for topic test not created.");
            int leaderId = this.waitUtilLeaderIsKnown(partitionHostingBrokers, topicPartition);
            KafkaBroker follower = this.findFollower(partitionHostingBrokers.values(), leaderId);
            follower.shutdown();
            admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", cluster.bootstrapServers());
            try (Admin otherAdmin = Admin.create((Properties)properties);){
                otherAdmin.createPartitions(Map.of(DEFAULT_TOPIC, NewPartitions.increaseTo((int)2))).all().get();
            }
            catch (ExecutionException executionException) {
                // empty catch block
            }
            follower.startup();
            cluster.waitForTopic(DEFAULT_TOPIC, 0);
        }
    }

    @ClusterTest
    public void testDeleteTopicDuringAddPartition(ClusterInstance cluster) throws Exception {
        try (Admin admin = cluster.admin();){
            admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, this.expectedReplicaAssignment))).all().get();
            int leaderId = this.waitUtilLeaderIsKnown(cluster.brokers(), new TopicPartition(DEFAULT_TOPIC, 0));
            TopicPartition newTopicPartition = new TopicPartition(DEFAULT_TOPIC, 1);
            KafkaBroker follower = this.findFollower(cluster.brokers().values(), leaderId);
            follower.shutdown();
            int followerBrokerId = follower.config().brokerId();
            TestUtils.waitForCondition(() -> follower.brokerState().equals((Object)BrokerState.SHUTTING_DOWN), (String)("Follower " + followerBrokerId + " was not shutdown"));
            Map<String, NewPartitions> newPartitionSet = Map.of(DEFAULT_TOPIC, NewPartitions.increaseTo((int)3));
            admin.createPartitions(newPartitionSet);
            cluster.waitForTopic(DEFAULT_TOPIC, 3);
            admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
            follower.startup();
            cluster.waitForTopic(DEFAULT_TOPIC, 0);
            this.waitForReplicaDeleted(cluster.brokers(), newTopicPartition, "Replica logs not for new partition [topic,1] not deleted after delete topic is complete.");
        }
    }

    @ClusterTest
    public void testAddPartitionDuringDeleteTopic(ClusterInstance cluster) throws Exception {
        try (Admin admin = cluster.admin();){
            admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, this.expectedReplicaAssignment))).all().get();
            TopicPartition newTopicPartition = new TopicPartition(DEFAULT_TOPIC, 1);
            admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
            Map<String, NewPartitions> newPartitionSet = Map.of(DEFAULT_TOPIC, NewPartitions.increaseTo((int)3));
            admin.createPartitions(newPartitionSet);
            cluster.waitForTopic(DEFAULT_TOPIC, 0);
            this.waitForReplicaDeleted(cluster.brokers(), newTopicPartition, "Replica logs not deleted after delete topic is complete");
        }
    }

    @ClusterTest
    public void testRecreateTopicAfterDeletion(ClusterInstance cluster) throws Exception {
        try (Admin admin = cluster.admin();){
            admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, this.expectedReplicaAssignment))).all().get();
            TopicPartition topicPartition = new TopicPartition(DEFAULT_TOPIC, 0);
            admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
            cluster.waitForTopic(DEFAULT_TOPIC, 0);
            admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, this.expectedReplicaAssignment))).all().get();
            this.waitForReplicaCreated(cluster.brokers(), topicPartition, "Replicas for topic topic not created.");
        }
    }

    @ClusterTest
    public void testDeleteNonExistingTopic(ClusterInstance cluster) throws Exception {
        try (Admin admin = cluster.admin();){
            admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, this.expectedReplicaAssignment))).all().get();
            TopicPartition topicPartition = new TopicPartition(DEFAULT_TOPIC, 0);
            String topic = "test2";
            TestUtils.waitForCondition(() -> {
                try {
                    admin.deleteTopics(List.of(topic)).all().get();
                    return false;
                }
                catch (Exception exception) {
                    return exception.getCause() instanceof UnknownTopicOrPartitionException;
                }
            }, (String)"Topic test2 should not exist.");
            cluster.waitForTopic(topic, 0);
            this.waitForReplicaCreated(cluster.brokers(), topicPartition, "Replicas for topic test not created.");
            TestUtils.waitUntilLeaderIsElectedOrChangedWithAdmin((Admin)admin, (String)DEFAULT_TOPIC, (int)0, (long)1000L);
        }
    }

    @ClusterTest(serverProperties={@ClusterConfigProperty(key="log.cleaner.enable", value="true"), @ClusterConfigProperty(key="log.cleanup.policy", value="compact"), @ClusterConfigProperty(key="log.cleaner.dedupe.buffer.size", value="1048577")})
    public void testDeleteTopicWithCleaner(ClusterInstance cluster) throws Exception {
        try (Admin admin = cluster.admin();){
            admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, this.expectedReplicaAssignment))).all().get();
            TopicPartition topicPartition = new TopicPartition(DEFAULT_TOPIC, 0);
            KafkaBroker server = (KafkaBroker)cluster.brokers().values().stream().findFirst().orElseThrow();
            TestUtils.waitForCondition(() -> server.logManager().getLog(topicPartition, false).isDefined(), (String)"Replicas for topic test not created.");
            UnifiedLog log = (UnifiedLog)server.logManager().getLog(topicPartition, false).get();
            this.writeDups(100, 3, log);
            ((UnifiedLog)server.logManager().getLog(topicPartition, false).get()).roll(Option.empty());
            server.logManager().cleaner().awaitCleaned(topicPartition, 0L, 60000L);
            admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
            cluster.waitForTopic(DEFAULT_TOPIC, 0);
        }
    }

    @ClusterTest
    public void testDeleteTopicAlreadyMarkedAsDeleted(ClusterInstance cluster) throws Exception {
        try (Admin admin = cluster.admin();){
            admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, this.expectedReplicaAssignment))).all().get();
            admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
            TestUtils.waitForCondition(() -> {
                try {
                    admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
                    return false;
                }
                catch (Exception exception) {
                    return exception.getCause() instanceof UnknownTopicOrPartitionException;
                }
            }, (String)"Topic topic should be marked for deletion or already deleted.");
            cluster.waitForTopic(DEFAULT_TOPIC, 0);
        }
    }

    @ClusterTest(controllers=1, serverProperties={@ClusterConfigProperty(key="delete.topic.enable", value="false")})
    public void testDisableDeleteTopic(ClusterInstance cluster) throws Exception {
        try (Admin admin = cluster.admin();){
            admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, this.expectedReplicaAssignment))).all().get();
            TopicPartition topicPartition = new TopicPartition(DEFAULT_TOPIC, 0);
            TestUtils.waitForCondition(() -> {
                try {
                    admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
                    return false;
                }
                catch (Exception exception) {
                    return exception.getCause() instanceof TopicDeletionDisabledException;
                }
            }, (String)"TopicDeletionDisabledException should be returned when deleting topic");
            this.waitForReplicaCreated(cluster.brokers(), topicPartition, "TopicDeletionDisabledException should be returned when deleting topic");
            Assertions.assertDoesNotThrow(() -> (Map)admin.describeTopics(List.of(DEFAULT_TOPIC)).allTopicNames().get());
            Assertions.assertDoesNotThrow(() -> this.waitUtilLeaderIsKnown(cluster.brokers(), topicPartition));
        }
    }

    private int waitUtilLeaderIsKnown(Map<Integer, KafkaBroker> idToBroker, TopicPartition topicPartition) throws InterruptedException {
        TestUtils.waitForCondition(() -> this.isLeaderKnown(idToBroker, topicPartition).get().isPresent(), (long)15000L, (String)("Partition " + String.valueOf(topicPartition) + " not made yet after 15 seconds"));
        return this.isLeaderKnown(idToBroker, topicPartition).get().get();
    }

    private void waitForReplicaCreated(Map<Integer, KafkaBroker> clusters, TopicPartition topicPartition, String failMessage) throws InterruptedException {
        TestUtils.waitForCondition(() -> clusters.values().stream().allMatch(broker -> broker.logManager().getLog(topicPartition, false).isDefined()), (String)failMessage);
    }

    private void waitForReplicaDeleted(Map<Integer, KafkaBroker> clusters, TopicPartition newTopicPartition, String failMessage) throws InterruptedException {
        TestUtils.waitForCondition(() -> clusters.values().stream().allMatch(broker -> broker.logManager().getLog(newTopicPartition, false).isEmpty()), (String)failMessage);
    }

    private Supplier<Optional<Integer>> isLeaderKnown(Map<Integer, KafkaBroker> idToBroker, TopicPartition topicPartition) {
        return () -> idToBroker.values().stream().filter(broker -> OptionConverters.toJava((Option)broker.replicaManager().onlinePartition(topicPartition)).stream().anyMatch(tp -> tp.leaderIdIfLocal().isDefined())).map(broker -> broker.config().brokerId()).findFirst();
    }

    private KafkaBroker findFollower(Collection<KafkaBroker> idToBroker, int leaderId) {
        return idToBroker.stream().filter(broker -> broker.config().brokerId() != leaderId).findFirst().orElseGet(() -> (KafkaBroker)Assertions.fail((String)"Can't find any follower"));
    }

    private void waitUtilTopicGone(Admin admin) throws Exception {
        TestUtils.waitForCondition(() -> {
            try {
                admin.describeTopics(List.of(DEFAULT_TOPIC)).allTopicNames().get();
                return false;
            }
            catch (Exception exception) {
                return exception.getCause() instanceof UnknownTopicOrPartitionException;
            }
        }, (String)"Topictopic should be deleted");
    }

    private Map<Integer, KafkaBroker> findPartitionHostingBrokers(Map<Integer, KafkaBroker> brokers) {
        return brokers.entrySet().stream().filter(broker -> this.expectedReplicaAssignment.get(0).contains(((KafkaBroker)broker.getValue()).config().brokerId())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private List<int[]> writeDups(int numKeys, int numDups, UnifiedLog log) {
        int counter = 0;
        ArrayList<int[]> result = new ArrayList<int[]>();
        for (int i = 0; i < numDups; ++i) {
            int key = 0;
            while (key < numKeys) {
                int count = counter;
                log.appendAsLeader(MemoryRecords.withRecords((Compression)Compression.NONE, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord(String.valueOf(key).getBytes(), String.valueOf(counter).getBytes())}), 0, AppendOrigin.CLIENT, RequestLocal.noCaching(), VerificationGuard.SENTINEL);
                ++counter;
                result.add(new int[]{key++, count});
            }
        }
        return result;
    }
}

