/*
 * Decompiled with CFR 0.152.
 */
package io.prestosql.tempto.fulfillment.table.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Shorts;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.name.Names;
import io.prestosql.tempto.configuration.Configuration;
import io.prestosql.tempto.fulfillment.table.MutableTableRequirement;
import io.prestosql.tempto.fulfillment.table.TableDefinition;
import io.prestosql.tempto.fulfillment.table.TableHandle;
import io.prestosql.tempto.fulfillment.table.TableInstance;
import io.prestosql.tempto.fulfillment.table.TableManager;
import io.prestosql.tempto.fulfillment.table.kafka.KafkaDataSource;
import io.prestosql.tempto.fulfillment.table.kafka.KafkaMessage;
import io.prestosql.tempto.fulfillment.table.kafka.KafkaTableDefinition;
import io.prestosql.tempto.fulfillment.table.kafka.KafkaTableInstance;
import io.prestosql.tempto.internal.fulfillment.table.TableName;
import io.prestosql.tempto.query.QueryExecutor;
import io.prestosql.tempto.query.QueryResult;
import java.lang.annotation.Annotation;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

@TableManager.Descriptor(tableDefinitionClass=KafkaTableDefinition.class, type="KAFKA")
@Singleton
public class KafkaTableManager
implements TableManager<KafkaTableDefinition> {
    private final String databaseName;
    private final QueryExecutor prestoQueryExecutor;
    private final Configuration brokerConfiguration;
    private final String prestoKafkaCatalog;

    @Inject
    public KafkaTableManager(@Named(value="databaseName") String databaseName, @Named(value="broker") Configuration brokerConfiguration, @Named(value="presto_database_name") String prestoDatabaseName, @Named(value="presto_kafka_catalog") String prestoKafkaCatalog, Injector injector) {
        this.databaseName = Objects.requireNonNull(databaseName, "databaseName is null");
        this.brokerConfiguration = Objects.requireNonNull(brokerConfiguration, "brokerConfiguration is null");
        Objects.requireNonNull(injector, "injector is null");
        Objects.requireNonNull(prestoDatabaseName, "prestoDatabaseName is null");
        this.prestoQueryExecutor = (QueryExecutor)injector.getInstance(Key.get(QueryExecutor.class, (Annotation)Names.named((String)prestoDatabaseName)));
        this.prestoKafkaCatalog = Objects.requireNonNull(prestoKafkaCatalog, "prestoKafkaCatalog is null");
    }

    public TableInstance<KafkaTableDefinition> createImmutable(KafkaTableDefinition tableDefinition, TableHandle tableHandle) {
        this.verifyTableExistsInPresto((String)tableHandle.getSchema().orElseThrow(() -> new IllegalArgumentException("Schema required for Kafka tables")), tableHandle.getName());
        this.deleteTopic(tableDefinition.getTopic());
        this.createTopic(tableDefinition.getTopic(), tableDefinition.getPartitionsCount(), tableDefinition.getReplicationLevel());
        this.insertDataIntoTopic(tableDefinition.getTopic(), tableDefinition.getDataSource());
        TableName createdTableName = new TableName(tableHandle.getDatabase().orElse(this.getDatabaseName()), tableHandle.getSchema(), tableHandle.getName(), tableHandle.getName());
        return new KafkaTableInstance(createdTableName, tableDefinition);
    }

    private void verifyTableExistsInPresto(String schema, String name) {
        String sql = String.format("select count(1) from %s.information_schema.tables where table_schema='%s' and table_name='%s'", this.prestoKafkaCatalog, schema, name);
        QueryResult queryResult = this.prestoQueryExecutor.executeQuery(sql, new QueryExecutor.QueryParam[0]);
        if ((Long)queryResult.row(0).get(0) != 1L) {
            throw new RuntimeException(String.format("Table %s.%s not defined if kafka catalog (%s)", schema, name, this.prestoKafkaCatalog));
        }
    }

    private void deleteTopic(String topic) {
        try (AdminClient kafkaAdminClient = this.getAdminClient();){
            ListTopicsResult topics = kafkaAdminClient.listTopics();
            Set names = (Set)topics.names().get();
            if (names.contains(topic)) {
                kafkaAdminClient.deleteTopics((Collection)ImmutableList.of((Object)topic)).all().get();
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Could not delete topic " + topic, e);
        }
    }

    private void createTopic(String topic, int partitionsCount, int replicationLevel) {
        try (AdminClient kafkaAdminClient = this.getAdminClient();){
            kafkaAdminClient.createTopics((Collection)ImmutableList.of((Object)new NewTopic(topic, partitionsCount, Shorts.checkedCast((long)replicationLevel)))).all().get();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void insertDataIntoTopic(String topic, KafkaDataSource dataSource) {
        KafkaProducer producer = new KafkaProducer(this.getKafkaProperties());
        Iterator<KafkaMessage> messages = dataSource.getMessages();
        while (messages.hasNext()) {
            KafkaMessage message = messages.next();
            try {
                producer.send(new ProducerRecord(topic, message.getPartition().isPresent() ? Integer.valueOf(message.getPartition().getAsInt()) : null, message.getKey().orElse(null), (Object)message.getValue())).get();
            }
            catch (Exception e) {
                throw new RuntimeException("could not send message to topic " + topic);
            }
        }
    }

    private Properties getKafkaProperties() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.brokerConfiguration.getStringMandatory("host") + ":" + this.brokerConfiguration.getIntMandatory("port"));
        props.put("acks", "all");
        props.put("retries", (Object)0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        for (String key : this.brokerConfiguration.listKeys()) {
            if (key.equals("host") || key.equals("port")) continue;
            props.put(key, this.brokerConfiguration.getStringMandatory(key));
        }
        return props;
    }

    public TableInstance<KafkaTableDefinition> createMutable(KafkaTableDefinition tableDefinition, MutableTableRequirement.State state, TableHandle tableHandle) {
        throw new IllegalArgumentException("Mutable tables are not supported by KafkaTableManager");
    }

    public void dropTable(TableName tableName) {
        throw new IllegalArgumentException("dropTable not supported by KafkaTableManager");
    }

    public void dropStaleMutableTables() {
    }

    public String getDatabaseName() {
        return this.databaseName;
    }

    public Class<? extends TableDefinition> getTableDefinitionClass() {
        return KafkaTableDefinition.class;
    }

    private AdminClient getAdminClient() {
        return KafkaAdminClient.create((Properties)this.getKafkaProperties());
    }
}

