/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.kafka;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.kafka.KafkaLogOptions;
import org.apache.paimon.flink.kafka.KafkaLogStoreRegister;
import org.apache.paimon.flink.kafka.KafkaTableTestBase;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.options.Options;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Fail;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

public class KafkaLogStoreRegisterITCase
extends KafkaTableTestBase {
    private static final String DATABASE = "mock_db";
    private static final String TABLE = "mock_table";

    @AfterEach
    public void tearDown() {
        try (AdminClient admin = this.createAdminClient();){
            Set topics = (Set)admin.listTopics().names().get();
            admin.deleteTopics((Collection)topics).all().get();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testRegisterTopic() {
        String topic = "register-topic";
        Map result = this.createKafkaLogStoreRegister(KafkaLogStoreRegisterITCase.getBootstrapServers(), topic, 2).registerTopic();
        this.checkTopicExists(topic, 2, 1);
        Assertions.assertThat((String)((String)result.get(KafkaLogOptions.TOPIC.key()))).isEqualTo(topic);
    }

    @Test
    public void testRegisterTopicAuto() {
        Map result = this.createKafkaLogStoreRegister(KafkaLogStoreRegisterITCase.getBootstrapServers()).registerTopic();
        try (AdminClient admin = this.createAdminClient();){
            Set topics = (Set)admin.listTopics().names().get(5L, TimeUnit.SECONDS);
            Assertions.assertThat((int)topics.size()).isEqualTo(1);
            String topicName = (String)topics.stream().findFirst().get();
            Assertions.assertThat((String)((String)result.get(KafkaLogOptions.TOPIC.key()))).isEqualTo(topicName);
            String preFix = String.format("%s_%s_", DATABASE, TABLE);
            Assertions.assertThat((String)topicName).startsWith((CharSequence)preFix);
            String uuid = topicName.substring(preFix.length());
            Assertions.assertThat((String)uuid).matches((CharSequence)"[0-9a-fA-F]{32}");
            Assertions.assertThat((String)((String)result.get(FlinkConnectorOptions.LOG_SYSTEM_PARTITIONS.key()))).isEqualTo("1");
        }
        catch (Exception e) {
            Fail.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRegisterTopicException() {
        String topic = "register-topic";
        String invalidBootstrapServers = "invalid-bootstrap-servers:9092";
        KafkaLogStoreRegister kafkaLogStoreRegister = this.createKafkaLogStoreRegister(invalidBootstrapServers, topic);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((KafkaLogStoreRegister)kafkaLogStoreRegister).registerTopic()).isInstanceOf(IllegalStateException.class)).hasMessageContaining("Register topic for table mock_db.mock_table failed").hasRootCauseInstanceOf(ConfigException.class);
    }

    @Test
    public void testRegisterTopicExist() {
        String topic = "topic-exist";
        this.createTopic(topic, 1, 1);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.createKafkaLogStoreRegister(KafkaLogStoreRegisterITCase.getBootstrapServers(), topic).registerTopic()).isInstanceOf(IllegalStateException.class)).hasMessageContaining("Register topic for table mock_db.mock_table failed").hasRootCauseInstanceOf(TopicExistsException.class);
    }

    @Test
    public void testUnregisterTopic() {
        String topic = "unregister-topic";
        this.createTopic(topic, 2, 1);
        this.createKafkaLogStoreRegister(KafkaLogStoreRegisterITCase.getBootstrapServers(), topic, 2).unRegisterTopic();
        this.checkTopicNotExist(topic);
    }

    @Test
    public void testUnregisterTopicException() {
        String topic = "not_exist_topic";
        Assertions.assertThatCode(() -> this.createKafkaLogStoreRegister(KafkaLogStoreRegisterITCase.getBootstrapServers(), topic).unRegisterTopic()).doesNotThrowAnyException();
    }

    private KafkaLogStoreRegister createKafkaLogStoreRegister(String bootstrapServers) {
        return this.createKafkaLogStoreRegister(bootstrapServers, null, null);
    }

    private KafkaLogStoreRegister createKafkaLogStoreRegister(String bootstrapServers, String topic) {
        return this.createKafkaLogStoreRegister(bootstrapServers, topic, null);
    }

    private KafkaLogStoreRegister createKafkaLogStoreRegister(String bootstrapServers, String topic, Integer partition) {
        final Options tableOptions = new Options();
        tableOptions.set(KafkaLogOptions.BOOTSTRAP_SERVERS, (Object)bootstrapServers);
        if (topic != null) {
            tableOptions.set(KafkaLogOptions.TOPIC, (Object)topic);
        }
        if (partition != null) {
            tableOptions.set(FlinkConnectorOptions.LOG_SYSTEM_PARTITIONS, (Object)partition);
        }
        tableOptions.set(FlinkCatalogOptions.REGISTER_TIMEOUT.key(), Duration.ofSeconds(20L).toString());
        return new KafkaLogStoreRegister(new LogStoreTableFactory.RegisterContext(){

            public Options getOptions() {
                return tableOptions;
            }

            public Identifier getIdentifier() {
                return Identifier.create((String)KafkaLogStoreRegisterITCase.DATABASE, (String)KafkaLogStoreRegisterITCase.TABLE);
            }
        });
    }

    private void createTopic(String topic, int partition, int replicationFactor) {
        try (AdminClient admin = this.createAdminClient();){
            admin.createTopics(Collections.singletonList(new NewTopic(topic, partition, (short)replicationFactor)));
        }
        catch (Exception e) {
            Fail.fail((String)e.getMessage());
        }
    }
}

