/*
 * Decompiled with CFR 0.152.
 */
package io.prestodb.tempto.fulfillment.table.kafka;

import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.name.Names;
import io.prestodb.tempto.fulfillment.table.MutableTableRequirement;
import io.prestodb.tempto.fulfillment.table.TableDefinition;
import io.prestodb.tempto.fulfillment.table.TableHandle;
import io.prestodb.tempto.fulfillment.table.TableInstance;
import io.prestodb.tempto.fulfillment.table.TableManager;
import io.prestodb.tempto.fulfillment.table.kafka.KafkaDataSource;
import io.prestodb.tempto.fulfillment.table.kafka.KafkaMessage;
import io.prestodb.tempto.fulfillment.table.kafka.KafkaTableDefinition;
import io.prestodb.tempto.fulfillment.table.kafka.KafkaTableInstance;
import io.prestodb.tempto.internal.fulfillment.table.TableName;
import io.prestodb.tempto.query.QueryExecutor;
import io.prestodb.tempto.query.QueryResult;
import java.lang.annotation.Annotation;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

@TableManager.Descriptor(tableDefinitionClass=KafkaTableDefinition.class, type="KAFKA")
@Singleton
public class KafkaTableManager
implements TableManager<KafkaTableDefinition> {
    private final String databaseName;
    private final QueryExecutor prestoQueryExecutor;
    private final String brokerHost;
    private final Integer brokerPort;
    private final String prestoKafkaCatalog;

    @Inject
    public KafkaTableManager(@Named(value="databaseName") String databaseName, @Named(value="broker.host") String brokerHost, @Named(value="broker.port") int brokerPort, @Named(value="presto_database_name") String prestoDatabaseName, @Named(value="presto_kafka_catalog") String prestoKafkaCatalog, Injector injector) {
        this.databaseName = Objects.requireNonNull(databaseName, "databaseName is null");
        this.brokerHost = Objects.requireNonNull(brokerHost, "brokerHost is null");
        this.brokerPort = brokerPort;
        Objects.requireNonNull(injector, "injector is null");
        Objects.requireNonNull(prestoDatabaseName, "prestoDatabaseName is null");
        this.prestoQueryExecutor = (QueryExecutor)injector.getInstance(Key.get(QueryExecutor.class, (Annotation)Names.named((String)prestoDatabaseName)));
        this.prestoKafkaCatalog = Objects.requireNonNull(prestoKafkaCatalog, "prestoKafkaCatalog is null");
    }

    public TableInstance<KafkaTableDefinition> createImmutable(KafkaTableDefinition tableDefinition, TableHandle tableHandle) {
        this.verifyTableExistsInPresto((String)tableHandle.getSchema().orElseThrow(() -> new IllegalArgumentException("Schema required for Kafka tables")), tableHandle.getName());
        this.deleteTopic(tableDefinition.getTopic());
        this.createTopic(tableDefinition.getTopic(), tableDefinition.getPartitionsCount(), tableDefinition.getReplicationLevel());
        this.insertDataIntoTopic(tableDefinition.getTopic(), tableDefinition.getDataSource());
        TableName createdTableName = new TableName(tableHandle.getDatabase().orElse(this.getDatabaseName()), tableHandle.getSchema(), tableHandle.getName(), tableHandle.getName());
        return new KafkaTableInstance(createdTableName, tableDefinition);
    }

    private void verifyTableExistsInPresto(String schema, String name) {
        String sql = String.format("select count(1) from %s.information_schema.tables where table_schema='%s' and table_name='%s'", this.prestoKafkaCatalog, schema, name);
        QueryResult queryResult = this.prestoQueryExecutor.executeQuery(sql, new QueryExecutor.QueryParam[0]);
        if ((Long)queryResult.row(0).get(0) != 1L) {
            throw new RuntimeException(String.format("Table %s.%s not defined if kafka catalog (%s)", schema, name, this.prestoKafkaCatalog));
        }
    }

    private void deleteTopic(String topic) {
        this.withAdminClient(adminClient -> {
            Supplier<Boolean> topicExists = () -> {
                try {
                    return ((Set)adminClient.listTopics().names().get()).stream().anyMatch(topic::equals);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            };
            if (topicExists.get().booleanValue()) {
                adminClient.deleteTopics(Collections.singletonList(topic));
            }
            for (int checkTry = 0; checkTry < 5; ++checkTry) {
                if (!topicExists.get().booleanValue()) {
                    return;
                }
                try {
                    Thread.sleep(1000L);
                    continue;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("could not delete topic " + topic);
                }
            }
        });
    }

    private void createTopic(String topic, int partitionsCount, int replicationLevel) {
        this.withAdminClient(adminClient -> {
            Properties topicConfiguration = new Properties();
            adminClient.createTopics(Collections.singletonList(new NewTopic(topic, partitionsCount, (short)replicationLevel).configs((Map)topicConfiguration.stringPropertyNames().stream().collect(ImmutableMap.toImmutableMap(key -> key, topicConfiguration::getProperty)))));
        });
    }

    private void insertDataIntoTopic(String topic, KafkaDataSource dataSource) {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.brokerHost + ":" + this.brokerPort);
        props.put("acks", "all");
        props.put("retries", (Object)0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer producer = new KafkaProducer(props);
        Iterator<KafkaMessage> messages = dataSource.getMessages();
        while (messages.hasNext()) {
            KafkaMessage message = messages.next();
            try {
                producer.send(new ProducerRecord(topic, message.getPartition().isPresent() ? Integer.valueOf(message.getPartition().getAsInt()) : null, message.getKey().orElse(null), (Object)message.getValue())).get();
            }
            catch (Exception e) {
                throw new RuntimeException("could not send message to topic " + topic);
            }
        }
    }

    private void withAdminClient(Consumer<AdminClient> routine) {
        int sessionTimeOutInMs = 15000;
        int connectionTimeOutInMs = 10000;
        String bootstrapHosts = this.brokerHost + ":" + this.brokerPort;
        Properties clientConfig = new Properties();
        clientConfig.putAll((Map<?, ?>)ImmutableMap.of((Object)"bootstrap.servers", (Object)bootstrapHosts, (Object)"request.timeout.ms", (Object)String.valueOf(sessionTimeOutInMs), (Object)"connections.max.idle.ms", (Object)String.valueOf(connectionTimeOutInMs)));
        try (AdminClient adminClient = AdminClient.create((Properties)clientConfig);){
            routine.accept(adminClient);
        }
    }

    public TableInstance<KafkaTableDefinition> createMutable(KafkaTableDefinition tableDefinition, MutableTableRequirement.State state, TableHandle tableHandle) {
        throw new IllegalArgumentException("Mutable tables are not supported by KafkaTableManager");
    }

    public void dropTable(TableName tableName) {
        throw new IllegalArgumentException("dropTable not supported by KafkaTableManager");
    }

    public void dropStaleMutableTables() {
    }

    public String getDatabaseName() {
        return this.databaseName;
    }

    public Class<? extends TableDefinition> getTableDefinitionClass() {
        return KafkaTableDefinition.class;
    }
}

