/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaCanalSyncTableActionITCase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseAction;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncTableAction;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.utils.StringUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

public abstract class KafkaActionITCaseBase
extends CdcActionITCaseBase {
    private final ObjectMapper objectMapper = new ObjectMapper();
    private static final Logger LOG = LoggerFactory.getLogger(KafkaActionITCaseBase.class);
    private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
    private static final Network NETWORK = Network.newNetwork();
    private static final int zkTimeoutMills = 30000;
    private final Timer loggingTimer = new Timer("Debug Logging Timer");
    @RegisterExtension
    public static final KafkaContainerExtension KAFKA_CONTAINER = (KafkaContainerExtension)((KafkaContainer)((KafkaContainer)((KafkaContainer)new KafkaContainerExtension(DockerImageName.parse((String)"confluentinc/cp-kafka:7.2.2")){

        protected void doStart() {
            super.doStart();
            if (LOG.isInfoEnabled()) {
                this.followOutput((Consumer)new Slf4jLogConsumer(LOG));
            }
        }
    }.withEmbeddedZookeeper().withNetwork(NETWORK)).withNetworkAliases(new String[]{"kafka"})).withEnv("KAFKA_TRANSACTION_MAX_TIMEOUT_MS", String.valueOf(Duration.ofHours(2L).toMillis()))).withEnv("KAFKA_LOG_RETENTION_MS", "-1");

    @BeforeEach
    public void setup() {
        this.scheduleTimeoutLogger(Duration.ofSeconds(30L), () -> {
            Map<String, TopicDescription> topicDescriptions = this.describeExternalTopics();
            LOG.info("Current existing topics: {}", topicDescriptions.keySet());
            this.logTopicPartitionStatus(topicDescriptions);
        });
    }

    @AfterEach
    public void after() throws Exception {
        super.after();
        this.cancelTimeoutLogger();
        this.deleteTopics();
    }

    private void deleteTopics() throws ExecutionException, InterruptedException {
        AdminClient adminClient = AdminClient.create((Properties)KafkaActionITCaseBase.getStandardProps());
        adminClient.deleteTopics((Collection)adminClient.listTopics().names().get()).all().get();
    }

    private void scheduleTimeoutLogger(Duration period, final Runnable loggingAction) {
        TimerTask timeoutLoggerTask = new TimerTask(){

            @Override
            public void run() {
                try {
                    loggingAction.run();
                }
                catch (Exception e) {
                    throw new RuntimeException("Failed to execute logging action", e);
                }
            }
        };
        this.loggingTimer.schedule(timeoutLoggerTask, 0L, period.toMillis());
    }

    private void cancelTimeoutLogger() {
        this.loggingTimer.cancel();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Map<String, TopicDescription> describeExternalTopics() {
        try (AdminClient adminClient = AdminClient.create((Properties)KafkaActionITCaseBase.getStandardProps());){
            List topics = ((Collection)adminClient.listTopics().listings().get()).stream().filter(listing -> !listing.isInternal()).map(TopicListing::name).collect(Collectors.toList());
            Map map = (Map)adminClient.describeTopics(topics).all().get();
            return map;
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to list Kafka topics", e);
        }
    }

    private void logTopicPartitionStatus(Map<String, TopicDescription> topicDescriptions) {
        Properties properties = KafkaActionITCaseBase.getStandardProps();
        properties.setProperty("group.id", "flink-tests-debugging");
        properties.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
        KafkaConsumer consumer = new KafkaConsumer(properties);
        ArrayList partitions = new ArrayList();
        topicDescriptions.forEach((topic, description) -> description.partitions().forEach(tpInfo -> partitions.add(new TopicPartition(topic, tpInfo.partition()))));
        Map beginningOffsets = consumer.beginningOffsets(partitions);
        Map endOffsets = consumer.endOffsets(partitions);
        partitions.forEach(partition -> LOG.info("TopicPartition \"{}\": starting offset: {}, stopping offset: {}", new Object[]{partition, beginningOffsets.get(partition), endOffsets.get(partition)}));
    }

    public static Properties getStandardProps() {
        Properties standardProps = new Properties();
        standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
        standardProps.put("group.id", "paimon-tests");
        standardProps.put("enable.auto.commit", (Object)false);
        standardProps.put("auto.offset.reset", "earliest");
        standardProps.put("max.partition.fetch.bytes", (Object)256);
        standardProps.put("zookeeper.session.timeout.ms", (Object)30000);
        standardProps.put("zookeeper.connection.timeout.ms", (Object)30000);
        standardProps.put("default.api.timeout.ms", "120000");
        return standardProps;
    }

    public String getBootstrapServers() {
        return KAFKA_CONTAINER.getBootstrapServers();
    }

    protected Map<String, String> getBasicKafkaConfig() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put("properties.bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
        config.put("properties.group.id", "paimon-tests");
        config.put("properties.enable.auto.commit", "false");
        config.put("properties.auto.offset.reset", "earliest");
        return config;
    }

    protected KafkaSyncTableActionBuilder syncTableActionBuilder(Map<String, String> kafkaConfig) {
        return new KafkaSyncTableActionBuilder(kafkaConfig);
    }

    protected KafkaSyncDatabaseActionBuilder syncDatabaseActionBuilder(Map<String, String> kafkaConfig) {
        return new KafkaSyncDatabaseActionBuilder(kafkaConfig);
    }

    public void createTestTopic(String topic, int numPartitions, int replicationFactor) {
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("bootstrap.servers", this.getBootstrapServers());
        try (AdminClient admin = AdminClient.create(properties);){
            admin.createTopics(Collections.singletonList(new NewTopic(topic, numPartitions, (short)replicationFactor))).all().get();
        }
        catch (Exception e) {
            throw new IllegalStateException(String.format("Fail to create topic [%s partitions: %d replication factor: %d].", topic, numPartitions, replicationFactor), e);
        }
    }

    public static List<String> readLines(String resource) throws IOException {
        URL url = KafkaCanalSyncTableActionITCase.class.getClassLoader().getResource(resource);
        Assertions.assertThat((URL)url).isNotNull();
        Path path = new File(url.getFile()).toPath();
        return Files.readAllLines(path);
    }

    void writeRecordsToKafka(String topic, List<String> lines) throws Exception {
        Properties producerProperties = KafkaActionITCaseBase.getStandardProps();
        producerProperties.setProperty("retries", "0");
        producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(producerProperties);
        for (int i = 0; i < lines.size(); ++i) {
            try {
                JsonNode jsonNode = this.objectMapper.readTree(lines.get(i));
                if (StringUtils.isEmpty((CharSequence)lines.get(i))) continue;
                kafkaProducer.send(new ProducerRecord(topic, (Object)lines.get(i)));
                continue;
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        kafkaProducer.close();
    }

    private static class KafkaContainerExtension
    extends KafkaContainer
    implements BeforeAllCallback,
    AfterAllCallback {
        private KafkaContainerExtension(DockerImageName dockerImageName) {
            super(dockerImageName);
        }

        public void beforeAll(ExtensionContext extensionContext) {
            this.doStart();
        }

        public void afterAll(ExtensionContext extensionContext) {
            this.close();
        }
    }

    protected class KafkaSyncDatabaseActionBuilder
    extends CdcActionITCaseBase.SyncDatabaseActionBuilder<KafkaSyncDatabaseAction> {
        public KafkaSyncDatabaseActionBuilder(Map<String, String> kafkaConfig) {
            super(kafkaConfig);
        }

        public KafkaSyncDatabaseActionBuilder ignoreIncompatible(boolean ignoreIncompatible) {
            throw new UnsupportedOperationException();
        }

        public KafkaSyncDatabaseActionBuilder mergeShards(boolean mergeShards) {
            throw new UnsupportedOperationException();
        }

        public KafkaSyncDatabaseActionBuilder withMode(String mode) {
            throw new UnsupportedOperationException();
        }

        @Override
        public KafkaSyncDatabaseAction build() {
            ArrayList<String> args = new ArrayList<String>(Arrays.asList("kafka_sync_database", "--warehouse", KafkaActionITCaseBase.this.warehouse, "--database", KafkaActionITCaseBase.this.database));
            args.addAll(KafkaActionITCaseBase.this.mapToArgs("--kafka-conf", this.sourceConfig));
            args.addAll(KafkaActionITCaseBase.this.mapToArgs("--catalog-conf", this.catalogConfig));
            args.addAll(KafkaActionITCaseBase.this.mapToArgs("--table-conf", this.tableConfig));
            args.addAll(KafkaActionITCaseBase.this.nullableToArgs("--table-prefix", this.tablePrefix));
            args.addAll(KafkaActionITCaseBase.this.nullableToArgs("--table-suffix", this.tableSuffix));
            args.addAll(KafkaActionITCaseBase.this.nullableToArgs("--including-tables", this.includingTables));
            args.addAll(KafkaActionITCaseBase.this.nullableToArgs("--excluding-tables", this.excludingTables));
            args.addAll(KafkaActionITCaseBase.this.listToArgs("--type-mapping", this.typeMappingModes));
            return (KafkaSyncDatabaseAction)KafkaActionITCaseBase.this.createAction(KafkaSyncDatabaseAction.class, args);
        }
    }

    protected class KafkaSyncTableActionBuilder
    extends CdcActionITCaseBase.SyncTableActionBuilder<KafkaSyncTableAction> {
        public KafkaSyncTableActionBuilder(Map<String, String> kafkaConfig) {
            super(kafkaConfig);
        }

        @Override
        public KafkaSyncTableAction build() {
            ArrayList<String> args = new ArrayList<String>(Arrays.asList("kafka_sync_table", "--warehouse", KafkaActionITCaseBase.this.warehouse, "--database", KafkaActionITCaseBase.this.database, "--table", KafkaActionITCaseBase.this.tableName));
            args.addAll(KafkaActionITCaseBase.this.mapToArgs("--kafka-conf", this.sourceConfig));
            args.addAll(KafkaActionITCaseBase.this.mapToArgs("--catalog-conf", this.catalogConfig));
            args.addAll(KafkaActionITCaseBase.this.mapToArgs("--table-conf", this.tableConfig));
            args.addAll(KafkaActionITCaseBase.this.listToArgs("--partition-keys", this.partitionKeys));
            args.addAll(KafkaActionITCaseBase.this.listToArgs("--primary-keys", this.primaryKeys));
            args.addAll(KafkaActionITCaseBase.this.listToArgs("--type-mapping", this.typeMappingModes));
            args.addAll(KafkaActionITCaseBase.this.listToMultiArgs("--computed-column", this.computedColumnArgs));
            return (KafkaSyncTableAction)KafkaActionITCaseBase.this.createAction(KafkaSyncTableAction.class, args);
        }
    }
}

