/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.pulsar;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
import org.apache.paimon.flink.action.cdc.pulsar.PulsarSyncDatabaseAction;
import org.apache.paimon.flink.action.cdc.pulsar.PulsarSyncTableAction;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

public class PulsarActionITCaseBase
extends CdcActionITCaseBase {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarActionITCaseBase.class);
    private static final Network NETWORK = Network.newNetwork();
    private static final String INTER_CONTAINER_PULSAR_ALIAS = "pulsar";
    public static final String IMAGE = "apachepulsar/pulsar";
    private final ObjectMapper objectMapper = new ObjectMapper();
    private PulsarAdmin admin;
    private PulsarClient client;
    protected List<String> topics = new ArrayList<String>();
    @RegisterExtension
    public static final PulsarContainerExtension PULSAR_CONTAINER = PulsarActionITCaseBase.createPulsarContainerExtension();

    private static PulsarContainerExtension createPulsarContainerExtension() {
        PulsarContainerExtension container = new PulsarContainerExtension(DockerImageName.parse((String)"apachepulsar/pulsar:3.0.0")){

            protected void doStart() {
                super.doStart();
                if (LOG.isInfoEnabled()) {
                    this.followOutput((Consumer)new Slf4jLogConsumer(LOG));
                }
            }
        };
        container.withNetwork(NETWORK);
        container.withNetworkAliases(new String[]{INTER_CONTAINER_PULSAR_ALIAS});
        container.withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true");
        container.withEnv("PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled", "true");
        container.withEnv("PULSAR_PREFIX_systemTopicEnabled", "true");
        container.withEnv("PULSAR_PREFIX_brokerDeduplicationEnabled", "true");
        container.withEnv("PULSAR_PREFIX_defaultNumberOfNamespaceBundles", "1");
        container.withCommand(new String[]{"sh", "-c", "/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && /pulsar/bin/pulsar standalone --no-functions-worker -nss"});
        container.waitingFor(Wait.forHttp((String)"/admin/v2/persistent/pulsar/system/transaction_coordinator_assign/partitions").forPort(8080).forStatusCode(200).withStartupTimeout(Duration.ofMinutes(5L)));
        return container;
    }

    @BeforeEach
    public void setup() throws Exception {
        this.admin = PulsarAdmin.builder().serviceHttpUrl(PULSAR_CONTAINER.getHttpServiceUrl()).build();
        this.client = PulsarClient.builder().serviceUrl(PULSAR_CONTAINER.getPulsarBrokerUrl()).enableTransaction(true).build();
    }

    @AfterEach
    public void after() throws Exception {
        super.after();
        this.deleteTopics();
        this.admin.close();
        this.client.close();
    }

    protected Map<String, String> getBasicPulsarConfig() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(PulsarOptions.PULSAR_SERVICE_URL.key(), PULSAR_CONTAINER.getPulsarBrokerUrl());
        config.put("pulsar.admin.adminUrl", PULSAR_CONTAINER.getHttpServiceUrl());
        config.put(PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME.key(), "paimon-tests");
        return config;
    }

    protected void createTopic(String topic) {
        this.createTopic(topic, 0);
    }

    protected void createTopic(String topic, int numberOfPartitions) {
        Preconditions.checkArgument((numberOfPartitions >= 0 ? 1 : 0) != 0);
        String topicName = TopicNameUtils.topicName((String)topic);
        if (numberOfPartitions == 0) {
            this.createNonPartitionedTopic(topicName);
        } else {
            this.createPartitionedTopic(topicName, numberOfPartitions);
        }
    }

    private void createNonPartitionedTopic(String topic) {
        try {
            this.admin.topics().createNonPartitionedTopic(topic);
        }
        catch (PulsarAdminException e) {
            throw new RuntimeException(e);
        }
    }

    private void createPartitionedTopic(String topic, int numberOfPartitions) {
        try {
            this.admin.topics().createPartitionedTopic(topic, numberOfPartitions);
        }
        catch (PulsarAdminException e) {
            throw new RuntimeException(e);
        }
    }

    private void deleteTopics() throws Exception {
        for (String topic : this.topics) {
            String topicName = TopicNameUtils.topicName((String)topic);
            PartitionedTopicMetadata metadata = this.admin.topics().getPartitionedTopicMetadata(topicName);
            if (metadata.partitions == 0) {
                this.admin.topics().delete(topicName, true);
                continue;
            }
            this.admin.topics().deletePartitionedTopic(topicName, true);
        }
    }

    protected List<String> getMessages(String resource) throws IOException {
        URL url = PulsarActionITCaseBase.class.getClassLoader().getResource(resource);
        Assertions.assertThat((URL)url).isNotNull();
        Path path = new File(url.getFile()).toPath();
        List<String> lines = Files.readAllLines(path);
        ArrayList<String> messages = new ArrayList<String>();
        for (String line : lines) {
            try {
                this.objectMapper.readTree(line);
                if (StringUtils.isEmpty((CharSequence)line)) continue;
                messages.add(line);
            }
            catch (Exception exception) {}
        }
        return messages;
    }

    protected void sendMessages(String topic, List<String> messages) {
        try (Producer<String> producer = this.createProducer(topic);){
            for (String message : messages) {
                producer.newMessage().value((Object)message).send();
            }
            producer.flush();
        }
        catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }

    private Producer<String> createProducer(String topic) {
        try {
            return this.client.newProducer(Schema.STRING).topic(topic).enableBatching(false).enableMultiSchema(true).accessMode(ProducerAccessMode.Shared).create();
        }
        catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }

    protected PulsarSyncTableActionBuilder syncTableActionBuilder(Map<String, String> pulsarConfig) {
        return new PulsarSyncTableActionBuilder(pulsarConfig);
    }

    protected PulsarSyncDatabaseActionBuilder syncDatabaseActionBuilder(Map<String, String> pulsarConfig) {
        return new PulsarSyncDatabaseActionBuilder(pulsarConfig);
    }

    private static class PulsarContainerExtension
    extends PulsarContainer
    implements BeforeAllCallback,
    AfterAllCallback {
        private PulsarContainerExtension(DockerImageName dockerImageName) {
            super(dockerImageName);
        }

        public void beforeAll(ExtensionContext extensionContext) {
            this.doStart();
        }

        public void afterAll(ExtensionContext extensionContext) {
            this.close();
        }
    }

    protected class PulsarSyncDatabaseActionBuilder
    extends CdcActionITCaseBase.SyncDatabaseActionBuilder<PulsarSyncDatabaseAction> {
        public PulsarSyncDatabaseActionBuilder(Map<String, String> pulsarConfig) {
            super(PulsarSyncDatabaseAction.class, pulsarConfig);
        }
    }

    protected class PulsarSyncTableActionBuilder
    extends CdcActionITCaseBase.SyncTableActionBuilder<PulsarSyncTableAction> {
        public PulsarSyncTableActionBuilder(Map<String, String> pulsarConfig) {
            super(PulsarSyncTableAction.class, pulsarConfig);
        }
    }
}

