/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.kafka;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClient;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.NewTopic;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.kafka.KafkaLogOptions;
import org.apache.paimon.flink.kafka.KafkaLogStoreFactory;
import org.apache.paimon.flink.log.LogStoreRegister;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.utils.Preconditions;

public class KafkaLogStoreRegister
implements LogStoreRegister {
    private final String bootstrapServers;
    private final String topic;
    private final int partition;
    private final int replicationFactor;
    private final Duration timeout;
    private final Properties properties;
    private final Identifier identifier;

    public KafkaLogStoreRegister(LogStoreTableFactory.RegisterContext context) {
        this.bootstrapServers = context.getOptions().get(KafkaLogOptions.BOOTSTRAP_SERVERS);
        this.identifier = context.getIdentifier();
        this.topic = context.getOptions().getOptional(KafkaLogOptions.TOPIC).isPresent() ? context.getOptions().get(KafkaLogOptions.TOPIC) : String.format("%s_%s_%s", this.identifier.getDatabaseName(), this.identifier.getObjectName(), UUID.randomUUID().toString().replace("-", ""));
        Preconditions.checkNotNull(context.getOptions().get(KafkaLogOptions.BOOTSTRAP_SERVERS));
        Preconditions.checkNotNull(this.topic);
        Preconditions.checkNotNull(this.identifier);
        this.timeout = context.getOptions().get(FlinkCatalogOptions.REGISTER_TIMEOUT.key()) == null ? FlinkCatalogOptions.REGISTER_TIMEOUT.defaultValue() : Duration.parse(context.getOptions().get(FlinkCatalogOptions.REGISTER_TIMEOUT.key()));
        int bucketNum = context.getOptions().get(CoreOptions.BUCKET) == -1 ? 1 : context.getOptions().get(CoreOptions.BUCKET);
        this.partition = context.getOptions().getOptional(FlinkConnectorOptions.LOG_SYSTEM_PARTITIONS).isPresent() ? context.getOptions().get(FlinkConnectorOptions.LOG_SYSTEM_PARTITIONS) : bucketNum;
        this.replicationFactor = context.getOptions().get(FlinkConnectorOptions.LOG_SYSTEM_REPLICATION);
        this.properties = KafkaLogStoreFactory.toKafkaProperties(context.getOptions());
    }

    @Override
    public Map<String, String> registerTopic() {
        try (AdminClient admin = AdminClient.create((Properties)this.properties);){
            NewTopic newTopic = new NewTopic(this.topic, this.partition, (short)this.replicationFactor);
            admin.createTopics(Collections.singleton(newTopic)).all().get(this.timeout.getSeconds(), TimeUnit.SECONDS);
        }
        catch (TimeoutException e) {
            throw new IllegalStateException(String.format("Register topic for table %s timeout with properties %s", this.identifier.getFullName(), this.properties), e);
        }
        catch (Exception e) {
            throw new IllegalStateException(String.format("Register topic for table %s failed with properties %s", this.identifier.getFullName(), this.properties), e);
        }
        return ImmutableMap.of(KafkaLogOptions.TOPIC.key(), this.topic, FlinkConnectorOptions.LOG_SYSTEM_PARTITIONS.key(), String.valueOf(this.partition), FlinkConnectorOptions.LOG_SYSTEM_REPLICATION.key(), String.valueOf(this.replicationFactor));
    }

    @Override
    public void unRegisterTopic() {
        try (AdminClient admin = AdminClient.create((Properties)this.properties);){
            admin.deleteTopics(Collections.singleton(this.topic)).all().get(this.timeout.getSeconds(), TimeUnit.SECONDS);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof UnknownTopicOrPartitionException) {
                return;
            }
            throw new IllegalStateException(String.format("Unregister topic for table %s failed with properties %s", this.identifier.getFullName(), this.properties), e);
        }
        catch (TimeoutException e) {
            throw new RuntimeException(String.format("Unregister topic for table %s timeout with properties %s", this.identifier.getFullName(), this.properties), e);
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Unregister topic for table %s failed with properties %s", this.identifier.getFullName(), this.properties), e);
        }
    }
}

