/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Fail;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

public abstract class KafkaTableTestBase
extends AbstractTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTableTestBase.class);
    private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
    private static final Network NETWORK = Network.newNetwork();
    private static final int zkTimeoutMills = 30000;
    @RegisterExtension
    public static final KafkaContainerExtension KAFKA_CONTAINER = (KafkaContainerExtension)((KafkaContainer)((KafkaContainer)((KafkaContainer)new KafkaContainerExtension(DockerImageName.parse((String)"confluentinc/cp-kafka:7.2.2")){

        protected void doStart() {
            super.doStart();
            if (LOG.isInfoEnabled()) {
                this.followOutput((Consumer)new Slf4jLogConsumer(LOG));
            }
        }
    }.withEmbeddedZookeeper().withNetwork(NETWORK)).withNetworkAliases(new String[]{"kafka"})).withEnv("KAFKA_TRANSACTION_MAX_TIMEOUT_MS", String.valueOf(Duration.ofHours(2L).toMillis()))).withEnv("KAFKA_LOG_RETENTION_MS", "-1");
    protected StreamExecutionEnvironment env;
    protected StreamTableEnvironment tEnv;
    private final Timer loggingTimer = new Timer("Debug Logging Timer");

    @BeforeEach
    public void setup() {
        this.env = this.streamExecutionEnvironmentBuilder().streamingMode().build();
        this.tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        this.tEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, (Object)false);
        this.scheduleTimeoutLogger(Duration.ofSeconds(30L), () -> {
            Map<String, TopicDescription> topicDescriptions = this.describeExternalTopics();
            LOG.info("Current existing topics: {}", topicDescriptions.keySet());
            this.logTopicPartitionStatus(topicDescriptions);
        });
    }

    @AfterEach
    public void after() throws ExecutionException, InterruptedException {
        this.cancelTimeoutLogger();
        this.deleteTopics();
    }

    public static Properties getStandardProps() {
        Properties standardProps = new Properties();
        standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
        standardProps.put("group.id", "flink-tests");
        standardProps.put("enable.auto.commit", (Object)false);
        standardProps.put("auto.offset.reset", "earliest");
        standardProps.put("max.partition.fetch.bytes", (Object)256);
        standardProps.put("zookeeper.session.timeout.ms", (Object)30000);
        standardProps.put("zookeeper.connection.timeout.ms", (Object)30000);
        return standardProps;
    }

    public static String getBootstrapServers() {
        return KAFKA_CONTAINER.getBootstrapServers();
    }

    protected boolean topicExists(String topicName) {
        return this.describeExternalTopics().containsKey(topicName);
    }

    public static void createTopicIfNotExists(String topicName, int numBucket) {
        block15: {
            try (AdminClient adminClient = AdminClient.create((Properties)KafkaTableTestBase.getStandardProps());){
                if (!((Set)adminClient.listTopics().names().get()).contains(topicName)) {
                    adminClient.createTopics(Collections.singleton(new NewTopic(topicName, Optional.of(numBucket), Optional.empty()))).all().get();
                }
            }
            catch (Exception e) {
                if (e.getCause() instanceof TopicExistsException) break block15;
                throw new RuntimeException(String.format("Failed to create Kafka topic %s", topicName), e);
            }
        }
    }

    protected void deleteTopicIfExists(String topicName) {
        block15: {
            try (AdminClient adminClient = AdminClient.create((Properties)KafkaTableTestBase.getStandardProps());){
                if (((Set)adminClient.listTopics().names().get()).contains(topicName)) {
                    adminClient.deleteTopics(Collections.singleton(topicName)).all().get();
                }
            }
            catch (Exception e) {
                if (e.getCause() instanceof UnknownTopicOrPartitionException) break block15;
                throw new RuntimeException(String.format("Failed to drop Kafka topic %s", topicName), e);
            }
        }
    }

    private void deleteTopics() throws ExecutionException, InterruptedException {
        AdminClient adminClient = AdminClient.create((Properties)KafkaTableTestBase.getStandardProps());
        adminClient.deleteTopics((Collection)adminClient.listTopics().names().get()).all().get();
    }

    private void scheduleTimeoutLogger(Duration period, final Runnable loggingAction) {
        TimerTask timeoutLoggerTask = new TimerTask(){

            @Override
            public void run() {
                try {
                    loggingAction.run();
                }
                catch (Exception e) {
                    throw new RuntimeException("Failed to execute logging action", e);
                }
            }
        };
        this.loggingTimer.schedule(timeoutLoggerTask, 0L, period.toMillis());
    }

    private void cancelTimeoutLogger() {
        this.loggingTimer.cancel();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Map<String, TopicDescription> describeExternalTopics() {
        try (AdminClient adminClient = AdminClient.create((Properties)KafkaTableTestBase.getStandardProps());){
            List topics = ((Collection)adminClient.listTopics().listings().get()).stream().filter(listing -> !listing.isInternal()).map(TopicListing::name).collect(Collectors.toList());
            Map map = (Map)adminClient.describeTopics(topics).allTopicNames().get();
            return map;
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to list Kafka topics", e);
        }
    }

    private void logTopicPartitionStatus(Map<String, TopicDescription> topicDescriptions) {
        Properties properties = KafkaTableTestBase.getStandardProps();
        properties.setProperty("group.id", "flink-tests-debugging");
        properties.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
        KafkaConsumer consumer = new KafkaConsumer(properties);
        ArrayList partitions = new ArrayList();
        topicDescriptions.forEach((topic, description) -> description.partitions().forEach(tpInfo -> partitions.add(new TopicPartition(topic, tpInfo.partition()))));
        Map beginningOffsets = consumer.beginningOffsets(partitions);
        Map endOffsets = consumer.endOffsets(partitions);
        partitions.forEach(partition -> LOG.info("TopicPartition \"{}\": starting offset: {}, stopping offset: {}", new Object[]{partition, beginningOffsets.get(partition), endOffsets.get(partition)}));
    }

    protected void checkTopicExists(String topic, int partition, int replicationFactor) {
        try (AdminClient admin = this.createAdminClient();){
            DescribeTopicsResult topicDesc = admin.describeTopics(Collections.singleton(topic));
            TopicDescription description = (TopicDescription)((Map)topicDesc.allTopicNames().get(10L, TimeUnit.SECONDS)).get(topic);
            Assertions.assertThat((int)description.partitions().size()).isEqualTo(partition);
            Assertions.assertThat((int)((TopicPartitionInfo)description.partitions().get(0)).replicas().size()).isEqualTo(replicationFactor);
        }
        catch (Exception e) {
            Fail.fail((String)e.getMessage());
        }
    }

    protected void checkTopicNotExist(String topic) {
        try (AdminClient admin = this.createAdminClient();){
            Assertions.assertThat((Map)((Map)admin.describeTopics(Collections.emptyList()).allTopicNames().get())).doesNotContainKey((Object)topic);
        }
        catch (Exception e) {
            Fail.fail((String)e.getMessage());
        }
    }

    protected AdminClient createAdminClient() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KafkaTableTestBase.getBootstrapServers());
        return AdminClient.create((Properties)properties);
    }

    protected static class KafkaContainerExtension
    extends KafkaContainer
    implements BeforeAllCallback,
    AfterAllCallback {
        private KafkaContainerExtension(DockerImageName dockerImageName) {
            super(dockerImageName);
        }

        public void beforeAll(ExtensionContext extensionContext) throws Exception {
            this.doStart();
        }

        public void afterAll(ExtensionContext extensionContext) throws Exception {
            this.close();
        }
    }
}

