/*
 * Decompiled with CFR 0.152.
 */
package io.trino.tests.product.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Ints;
import io.airlift.units.Duration;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.trino.tempto.ProductTest;
import io.trino.tempto.assertions.QueryAssert;
import io.trino.tempto.context.ThreadLocalTestContextHolder;
import io.trino.tempto.fulfillment.table.TableHandle;
import io.trino.tempto.fulfillment.table.TableManager;
import io.trino.tempto.fulfillment.table.kafka.KafkaDataSource;
import io.trino.tempto.fulfillment.table.kafka.KafkaMessage;
import io.trino.tempto.fulfillment.table.kafka.KafkaMessageContentsBuilder;
import io.trino.tempto.fulfillment.table.kafka.KafkaTableDefinition;
import io.trino.tempto.fulfillment.table.kafka.KafkaTableManager;
import io.trino.tempto.fulfillment.table.kafka.ListKafkaDataSource;
import io.trino.tempto.query.QueryExecutor;
import io.trino.tempto.query.QueryResult;
import io.trino.tests.product.utils.QueryAssertions;
import io.trino.tests.product.utils.QueryExecutors;
import io.trino.tests.product.utils.SchemaRegistryClientUtils;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestKafkaAvroReadsSmokeTest
extends ProductTest {
    private static final String KAFKA_SCHEMA = "product_tests";
    private static final String ALL_DATATYPES_AVRO_TOPIC_NAME = "read_all_datatypes_avro";
    private static final String ALL_DATATYPE_SCHEMA_PATH = "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/all_datatypes_avro_schema.avsc";
    private static final String ALL_NULL_AVRO_TOPIC_NAME = "read_all_null_avro";
    private static final String STRUCTURAL_AVRO_TOPIC_NAME = "read_structural_datatype_avro";
    private static final String STRUCTURAL_SCHEMA_PATH = "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/structural_datatype_avro_schema.avsc";
    private static final String AVRO_SCHEMA_WITH_REFERENCES_TOPIC_NAME = "schema_with_references_avro";
    private static final String AVRO_SCHEMA_WITH_REFERENCES_SCHEMA_PATH = "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/schema_with_references.avsc";

    @Test(groups={"kafka", "profile_specific_tests"}, dataProvider="catalogs")
    public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog, MessageSerializer messageSerializer) throws Exception {
        ImmutableMap record = ImmutableMap.of((Object)"a_varchar", (Object)"foobar", (Object)"a_bigint", (Object)127L, (Object)"a_double", (Object)234.567, (Object)"a_boolean", (Object)true);
        String topicName = ALL_DATATYPES_AVRO_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix();
        TestKafkaAvroReadsSmokeTest.createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_DATATYPES_AVRO_TOPIC_NAME, topicName, (Map<String, Object>)record, messageSerializer);
        QueryAssertions.assertEventually(new Duration(30.0, TimeUnit.SECONDS), () -> {
            QueryResult queryResult = QueryExecutors.onTrino().executeQuery(String.format("select * from %s.%s", kafkaCatalog.getCatalogName(), "product_tests." + topicName), new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)queryResult).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"foobar", 127, 234.567, true})});
        });
    }

    @Test(groups={"kafka", "profile_specific_tests"}, dataProvider="catalogs")
    public void testNullType(KafkaCatalog kafkaCatalog, MessageSerializer messageSerializer) throws Exception {
        String topicName = ALL_NULL_AVRO_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix();
        TestKafkaAvroReadsSmokeTest.createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_NULL_AVRO_TOPIC_NAME, topicName, (Map<String, Object>)ImmutableMap.of(), messageSerializer);
        QueryAssertions.assertEventually(new Duration(30.0, TimeUnit.SECONDS), () -> {
            QueryResult queryResult = QueryExecutors.onTrino().executeQuery(String.format("select * from %s.%s", kafkaCatalog.getCatalogName(), "product_tests." + topicName), new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)queryResult).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{null, null, null, null})});
        });
    }

    @Test(groups={"kafka", "profile_specific_tests"}, dataProvider="catalogs")
    public void testSelectStructuralDataType(KafkaCatalog kafkaCatalog, MessageSerializer messageSerializer) throws Exception {
        ImmutableMap record = ImmutableMap.of((Object)"a_array", (Object)ImmutableList.of((Object)100L, (Object)102L), (Object)"a_map", (Object)ImmutableMap.of((Object)"key1", (Object)"value1"));
        String topicName = STRUCTURAL_AVRO_TOPIC_NAME + kafkaCatalog.getTopicNameSuffix();
        TestKafkaAvroReadsSmokeTest.createAvroTable(STRUCTURAL_SCHEMA_PATH, STRUCTURAL_AVRO_TOPIC_NAME, topicName, (Map<String, Object>)record, messageSerializer);
        QueryAssertions.assertEventually(new Duration(30.0, TimeUnit.SECONDS), () -> {
            QueryResult queryResult = QueryExecutors.onTrino().executeQuery(String.format("SELECT a[1], a[2], m['key1'] FROM (SELECT %s as a, %s as m FROM %s.%s) t", kafkaCatalog.isColumnMappingSupported() ? "c_array" : "a_array", kafkaCatalog.isColumnMappingSupported() ? "c_map" : "a_map", kafkaCatalog.getCatalogName(), "product_tests." + topicName), new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)queryResult).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{100, 102, "value1"})});
        });
    }

    @DataProvider
    public static Object[][] catalogs() {
        return new Object[][]{{new KafkaCatalog("kafka", "", true), new AvroMessageSerializer()}, {new KafkaCatalog("kafka_schema_registry", "_schema_registry", false), new SchemaRegistryAvroMessageSerializer()}};
    }

    @Test(groups={"kafka", "profile_specific_tests"})
    public void testAvroWithSchemaReferences() throws Exception {
        TestingAvroSchema referredSchema = new TestingAvroSchema(Files.readString(new File(ALL_DATATYPE_SCHEMA_PATH).toPath()), (List<SchemaReference>)ImmutableList.of(), (List<String>)ImmutableList.of());
        SchemaRegistryClientUtils.getSchemaRegistryClient().register("read_all_datatypes_avro-value", (ParsedSchema)referredSchema);
        ImmutableMap record = ImmutableMap.of((Object)"a_varchar", (Object)"foobar", (Object)"a_bigint", (Object)127L, (Object)"a_double", (Object)234.567, (Object)"a_boolean", (Object)true);
        GenericRecordBuilder recordBuilder = new GenericRecordBuilder((Schema)referredSchema.rawSchema());
        record.forEach((arg_0, arg_1) -> ((GenericRecordBuilder)recordBuilder).set(arg_0, arg_1));
        TestingAvroSchema actualSchema = new TestingAvroSchema(Files.readString(new File(AVRO_SCHEMA_WITH_REFERENCES_SCHEMA_PATH).toPath()), (List<SchemaReference>)ImmutableList.of((Object)new SchemaReference(referredSchema.name(), "read_all_datatypes_avro-value", Integer.valueOf(1))), (List<String>)ImmutableList.of((Object)referredSchema.canonicalString()));
        TestKafkaAvroReadsSmokeTest.createAvroTable(actualSchema, ALL_DATATYPES_AVRO_TOPIC_NAME, AVRO_SCHEMA_WITH_REFERENCES_TOPIC_NAME, (Map<String, Object>)ImmutableMap.of((Object)"reference", (Object)recordBuilder.build()), (MessageSerializer)new SchemaRegistryAvroMessageSerializer());
        QueryAssertions.assertEventually(new Duration(30.0, TimeUnit.SECONDS), () -> {
            QueryResult queryResult = QueryExecutors.onTrino().executeQuery(String.format("select reference.a_varchar, reference.a_double from kafka_schema_registry.%s.%s", KAFKA_SCHEMA, AVRO_SCHEMA_WITH_REFERENCES_TOPIC_NAME), new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)queryResult).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"foobar", 234.567})});
        });
    }

    private static void createAvroTable(String schemaPath, String tableName, String topicName, Map<String, Object> record, MessageSerializer messageSerializer) throws Exception {
        String schema = Files.readString(new File(schemaPath).toPath());
        TestKafkaAvroReadsSmokeTest.createAvroTable(new TestingAvroSchema(schema, (List<SchemaReference>)ImmutableList.of(), (List<String>)ImmutableList.of()), tableName, topicName, record, messageSerializer);
    }

    private static void createAvroTable(TestingAvroSchema schema, String tableName, String topicName, Map<String, Object> record, MessageSerializer messageSerializer) throws Exception {
        byte[] avroData = messageSerializer.serialize(topicName, schema, record);
        KafkaTableDefinition tableDefinition = new KafkaTableDefinition("product_tests." + tableName, topicName, (KafkaDataSource)new ListKafkaDataSource((List)ImmutableList.of((Object)new KafkaMessage(KafkaMessageContentsBuilder.contentsBuilder().appendBytes(avroData).build()))), 1, 1);
        KafkaTableManager kafkaTableManager = (KafkaTableManager)ThreadLocalTestContextHolder.testContext().getDependency(TableManager.class, "kafka");
        kafkaTableManager.createImmutable(tableDefinition, TableHandle.tableHandle((String)tableName).inSchema(KAFKA_SCHEMA));
    }

    private static class TestingAvroSchema
    implements ParsedSchema {
        private final String avroSchemaString;
        private final Schema avroSchema;
        private final List<SchemaReference> schemaReferences;

        public TestingAvroSchema(String avroSchemaString, List<SchemaReference> schemaReferences, List<String> resolvedReferences) {
            this.avroSchemaString = Objects.requireNonNull(avroSchemaString, "avroSchemaString is null");
            this.schemaReferences = ImmutableList.copyOf((Collection)Objects.requireNonNull(schemaReferences, "schemaReferences is null"));
            Objects.requireNonNull(resolvedReferences, "resolvedReferences is null");
            Schema.Parser parser = new Schema.Parser();
            resolvedReferences.forEach(arg_0 -> ((Schema.Parser)parser).parse(arg_0));
            this.avroSchema = parser.parse(avroSchemaString);
        }

        public String schemaType() {
            return "AVRO";
        }

        public String name() {
            return this.avroSchema.getName();
        }

        public String canonicalString() {
            return this.avroSchemaString;
        }

        public List<SchemaReference> references() {
            return this.schemaReferences;
        }

        public boolean isBackwardCompatible(ParsedSchema parsedSchema) {
            return false;
        }

        public Object rawSchema() {
            return this.avroSchema;
        }
    }

    private static final class SchemaRegistryAvroMessageSerializer
    implements MessageSerializer {
        private SchemaRegistryAvroMessageSerializer() {
        }

        @Override
        public byte[] serialize(String topic, ParsedSchema parsedSchema, Map<String, Object> values) throws IOException {
            byte[] byArray;
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            try {
                Schema schema = (Schema)parsedSchema.rawSchema();
                out.write(0);
                int schemaId = SchemaRegistryClientUtils.getSchemaRegistryClient().register(topic + "-value", parsedSchema);
                out.write(Ints.toByteArray((int)schemaId));
                BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder((OutputStream)out, null);
                GenericDatumWriter writer = new GenericDatumWriter(schema);
                encoder.flush();
                GenericData.Record record = new GenericData.Record(schema);
                values.forEach((arg_0, arg_1) -> ((GenericData.Record)record).put(arg_0, arg_1));
                writer.write((Object)record, (Encoder)encoder);
                byArray = out.toByteArray();
            }
            catch (Throwable throwable) {
                try {
                    try {
                        out.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                    throw throwable;
                }
                catch (RestClientException clientException) {
                    throw new RuntimeException(clientException);
                }
            }
            out.close();
            return byArray;
        }
    }

    private static final class AvroMessageSerializer
    implements MessageSerializer {
        private AvroMessageSerializer() {
        }

        @Override
        public byte[] serialize(String topic, ParsedSchema parsedSchema, Map<String, Object> values) throws IOException {
            Schema schema = (Schema)parsedSchema.rawSchema();
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
            GenericData.Record record = new GenericData.Record(schema);
            values.forEach((arg_0, arg_1) -> ((GenericData.Record)record).put(arg_0, arg_1));
            try (DataFileWriter dataFileWriter = new DataFileWriter((DatumWriter)new GenericDatumWriter(schema));){
                dataFileWriter.create(schema, (OutputStream)outputStream);
                dataFileWriter.append((Object)record);
            }
            return outputStream.toByteArray();
        }
    }

    @FunctionalInterface
    private static interface MessageSerializer {
        public byte[] serialize(String var1, ParsedSchema var2, Map<String, Object> var3) throws IOException;
    }

    private static final class KafkaCatalog {
        private final String catalogName;
        private final String topicNameSuffix;
        private final boolean columnMappingSupported;

        private KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported) {
            this.catalogName = Objects.requireNonNull(catalogName, "catalogName is null");
            this.topicNameSuffix = Objects.requireNonNull(topicNameSuffix, "topicNameSuffix is null");
            this.columnMappingSupported = columnMappingSupported;
        }

        public String getCatalogName() {
            return this.catalogName;
        }

        public String getTopicNameSuffix() {
            return this.topicNameSuffix;
        }

        public boolean isColumnMappingSupported() {
            return this.columnMappingSupported;
        }

        public String toString() {
            return this.catalogName;
        }
    }
}

