/*
 * Decompiled with CFR 0.152.
 */
package io.trino.tests.product.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import com.google.common.primitives.Ints;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.airlift.units.Duration;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.trino.tempto.ProductTest;
import io.trino.tempto.assertions.QueryAssert;
import io.trino.tempto.context.ThreadLocalTestContextHolder;
import io.trino.tempto.fulfillment.table.TableHandle;
import io.trino.tempto.fulfillment.table.TableManager;
import io.trino.tempto.fulfillment.table.kafka.KafkaDataSource;
import io.trino.tempto.fulfillment.table.kafka.KafkaMessage;
import io.trino.tempto.fulfillment.table.kafka.KafkaMessageContentsBuilder;
import io.trino.tempto.fulfillment.table.kafka.KafkaTableDefinition;
import io.trino.tempto.fulfillment.table.kafka.KafkaTableManager;
import io.trino.tempto.fulfillment.table.kafka.ListKafkaDataSource;
import io.trino.tempto.query.QueryExecutor;
import io.trino.tempto.query.QueryResult;
import io.trino.tests.product.utils.QueryAssertions;
import io.trino.tests.product.utils.QueryExecutors;
import io.trino.tests.product.utils.SchemaRegistryClientUtils;
import java.io.ByteArrayOutputStream;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.AssertProvider;
import org.assertj.core.api.Assertions;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestKafkaProtobufReadsSmokeTest
extends ProductTest {
    private static final String KAFKA_SCHEMA = "product_tests";
    private static final String BASIC_DATATYPES_PROTOBUF_TOPIC_NAME = "read_basic_datatypes_protobuf";
    private static final String BASIC_DATATYPES_SCHEMA_PATH = "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/basic_datatypes.proto";
    private static final String BASIC_STRUCTURAL_PROTOBUF_TOPIC_NAME = "read_basic_structural_datatypes_protobuf";
    private static final String BASIC_STRUCTURAL_SCHEMA_PATH = "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/basic_structural_datatypes.proto";
    private static final String ALL_DATATYPES_PROTOBUF_TOPIC_SCHEMA_REGISTRY = "all_datatypes_protobuf_schema_registry";
    private static final String ALL_DATATYPES_SCHEMA_PATH = "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/all_datatypes.proto";
    private static final KafkaCatalog KAFKA_CATALOG = new KafkaCatalog("kafka", "", true, new ProtobufMessageSerializer());
    private static final KafkaCatalog KAFKA_SCHEMA_REGISTRY_CATALOG = new KafkaCatalog("kafka_schema_registry", "_schema_registry", false, new SchemaRegistryProtobufMessageSerializer());

    @Test(groups={"kafka", "profile_specific_tests"})
    public void testSelectPrimitiveDataType() throws Exception {
        this.selectPrimitiveDataType(KAFKA_CATALOG);
    }

    @Test(groups={"kafka_confluent_license", "profile_specific_tests"})
    public void testSelectPrimitiveDataTypeWithSchemaRegistry() throws Exception {
        this.selectPrimitiveDataType(KAFKA_SCHEMA_REGISTRY_CATALOG);
    }

    private void selectPrimitiveDataType(KafkaCatalog kafkaCatalog) throws Exception {
        ImmutableMap record = ImmutableMap.builder().put((Object)"a_varchar", (Object)"foobar").put((Object)"b_integer", (Object)314).put((Object)"c_bigint", (Object)Long.MAX_VALUE).put((Object)"d_double", (Object)1.2345678901234567E9).put((Object)"e_float", (Object)Float.valueOf(3.14f)).put((Object)"f_boolean", (Object)true).buildOrThrow();
        String topicName = BASIC_DATATYPES_PROTOBUF_TOPIC_NAME + kafkaCatalog.topicNameSuffix();
        TestKafkaProtobufReadsSmokeTest.createProtobufTable(BASIC_DATATYPES_SCHEMA_PATH, BASIC_DATATYPES_PROTOBUF_TOPIC_NAME, topicName, (Map<String, Object>)record, kafkaCatalog.messageSerializer());
        QueryAssertions.assertEventually(new Duration(30.0, TimeUnit.SECONDS), () -> {
            QueryResult queryResult = QueryExecutors.onTrino().executeQuery(String.format("select * from %s.%s", kafkaCatalog.catalogName(), "product_tests." + topicName), new QueryExecutor.QueryParam[0]);
            ((QueryAssert)Assertions.assertThat((AssertProvider)queryResult)).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"foobar", 314, Long.MAX_VALUE, 1.2345678901234567E9, Float.valueOf(3.14f), true})});
        });
    }

    @Test(groups={"kafka", "profile_specific_tests"})
    public void testSelectStructuralDataType() throws Exception {
        this.selectStructuralDataType(KAFKA_CATALOG);
    }

    @Test(groups={"kafka_confluent_license", "profile_specific_tests"})
    public void testSelectStructuralDataTypeWithSchemaRegistry() throws Exception {
        this.selectStructuralDataType(KAFKA_SCHEMA_REGISTRY_CATALOG);
    }

    private void selectStructuralDataType(KafkaCatalog kafkaCatalog) throws Exception {
        ImmutableMap record = ImmutableMap.of((Object)"a_array", (Object)ImmutableList.of((Object)100L, (Object)101L), (Object)"a_map", (Object)ImmutableMap.of((Object)"key", (Object)"key1", (Object)"value", (Object)1.2345678901234567E9));
        String topicName = BASIC_STRUCTURAL_PROTOBUF_TOPIC_NAME + kafkaCatalog.topicNameSuffix();
        TestKafkaProtobufReadsSmokeTest.createProtobufTable(BASIC_STRUCTURAL_SCHEMA_PATH, BASIC_STRUCTURAL_PROTOBUF_TOPIC_NAME, topicName, (Map<String, Object>)record, kafkaCatalog.messageSerializer());
        QueryAssertions.assertEventually(new Duration(30.0, TimeUnit.SECONDS), () -> {
            QueryResult queryResult = QueryExecutors.onTrino().executeQuery(String.format("SELECT a[1], a[2], m['key1'] FROM (SELECT %s as a, %s as m FROM %s.%s) t", kafkaCatalog.columnMappingSupported() ? "c_array" : "a_array", kafkaCatalog.columnMappingSupported() ? "c_map" : "a_map", kafkaCatalog.catalogName(), "product_tests." + topicName), new QueryExecutor.QueryParam[0]);
            ((QueryAssert)Assertions.assertThat((AssertProvider)queryResult)).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{100L, 101L, 1.2345678901234567E9})});
        });
    }

    @Test(groups={"kafka_confluent_license", "profile_specific_tests"})
    public void testProtobufWithSchemaReferences() throws Exception {
        String timestampTopic = "timestamp";
        String timestampProtoFile = "google/protobuf/timestamp.proto";
        ProtobufSchema baseSchema = new ProtobufSchema(Resources.toString((URL)Resources.getResource(TestKafkaProtobufReadsSmokeTest.class, (String)("/" + timestampProtoFile)), (Charset)StandardCharsets.UTF_8), (List)ImmutableList.of(), (Map)ImmutableMap.of(), null, timestampProtoFile);
        SchemaRegistryClientUtils.getSchemaRegistryClient().register(timestampTopic, (ParsedSchema)baseSchema);
        ProtobufSchema actualSchema = new ProtobufSchema(Files.readString(Path.of(ALL_DATATYPES_SCHEMA_PATH, new String[0])), (List)ImmutableList.of((Object)new SchemaReference(baseSchema.name(), timestampTopic, Integer.valueOf(1))), (Map)ImmutableMap.of((Object)timestampProtoFile, (Object)baseSchema.canonicalString()), null, null);
        LocalDateTime timestamp = LocalDateTime.parse("2020-12-12T15:35:45.923");
        com.google.protobuf.Timestamp timestampProto = com.google.protobuf.Timestamp.newBuilder().setSeconds(timestamp.toEpochSecond(ZoneOffset.UTC)).setNanos(timestamp.getNano()).build();
        ImmutableMap record = ImmutableMap.builder().put((Object)"a_varchar", (Object)"foobar").put((Object)"b_integer", (Object)2).put((Object)"c_bigint", (Object)Long.MAX_VALUE).put((Object)"d_double", (Object)1.2345678901234567E9).put((Object)"e_float", (Object)Float.valueOf(3.14f)).put((Object)"f_boolean", (Object)true).put((Object)"h_timestamp", (Object)timestampProto).buildOrThrow();
        TestKafkaProtobufReadsSmokeTest.createProtobufTable(actualSchema, BASIC_DATATYPES_PROTOBUF_TOPIC_NAME, ALL_DATATYPES_PROTOBUF_TOPIC_SCHEMA_REGISTRY, (Map<String, Object>)record, (MessageSerializer)new SchemaRegistryProtobufMessageSerializer());
        QueryAssertions.assertEventually(new Duration(30.0, TimeUnit.SECONDS), () -> {
            QueryResult queryResult = QueryExecutors.onTrino().executeQuery(String.format("select * from %s.%s.%s", KAFKA_SCHEMA_REGISTRY_CATALOG.catalogName(), KAFKA_SCHEMA, ALL_DATATYPES_PROTOBUF_TOPIC_SCHEMA_REGISTRY), new QueryExecutor.QueryParam[0]);
            ((QueryAssert)Assertions.assertThat((AssertProvider)queryResult)).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"foobar", 2, Long.MAX_VALUE, 1.2345678901234567E9, Float.valueOf(3.14f), true, "ZERO", Timestamp.valueOf(timestamp)})});
        });
    }

    private static void createProtobufTable(String schemaPath, String tableName, String topicName, Map<String, Object> record, MessageSerializer messageSerializer) throws Exception {
        TestKafkaProtobufReadsSmokeTest.createProtobufTable(new ProtobufSchema(Files.readString(Path.of(schemaPath, new String[0]))), tableName, topicName, record, messageSerializer);
    }

    private static void createProtobufTable(ProtobufSchema protobufSchema, String tableName, String topicName, Map<String, Object> record, MessageSerializer messageSerializer) throws Exception {
        byte[] protobufData = messageSerializer.serialize(topicName, protobufSchema, record);
        KafkaTableDefinition tableDefinition = new KafkaTableDefinition("product_tests." + tableName, topicName, (KafkaDataSource)new ListKafkaDataSource((List)ImmutableList.of((Object)new KafkaMessage(KafkaMessageContentsBuilder.contentsBuilder().appendBytes(protobufData).build()))), 1, 1);
        KafkaTableManager kafkaTableManager = (KafkaTableManager)ThreadLocalTestContextHolder.testContext().getDependency(TableManager.class, "kafka");
        kafkaTableManager.createImmutable(tableDefinition, TableHandle.tableHandle((String)tableName).inSchema(KAFKA_SCHEMA));
    }

    private static DynamicMessage buildDynamicMessage(Descriptors.Descriptor descriptor, Map<String, Object> data) {
        DynamicMessage.Builder builder = DynamicMessage.newBuilder((Descriptors.Descriptor)descriptor);
        for (Map.Entry<String, Object> entry : data.entrySet()) {
            Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName(entry.getKey());
            if (entry.getValue() instanceof Map) {
                builder.setField(fieldDescriptor, (Object)ImmutableList.of((Object)TestKafkaProtobufReadsSmokeTest.buildDynamicMessage(fieldDescriptor.getMessageType(), (Map)entry.getValue())));
                continue;
            }
            builder.setField(fieldDescriptor, entry.getValue());
        }
        return builder.build();
    }

    private record KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported, MessageSerializer messageSerializer) {
        private KafkaCatalog(String catalogName, String topicNameSuffix, boolean columnMappingSupported, MessageSerializer messageSerializer) {
            this.catalogName = Objects.requireNonNull(catalogName, "catalogName is null");
            this.topicNameSuffix = Objects.requireNonNull(topicNameSuffix, "topicNameSuffix is null");
            this.columnMappingSupported = columnMappingSupported;
            this.messageSerializer = Objects.requireNonNull(messageSerializer, "messageSerializer is null");
        }

        @Override
        public String toString() {
            return this.catalogName;
        }
    }

    @FunctionalInterface
    private static interface MessageSerializer {
        public byte[] serialize(String var1, ProtobufSchema var2, Map<String, Object> var3) throws Exception;
    }

    private static final class SchemaRegistryProtobufMessageSerializer
    implements MessageSerializer {
        private SchemaRegistryProtobufMessageSerializer() {
        }

        @Override
        public byte[] serialize(String topic, ProtobufSchema protobufSchema, Map<String, Object> values) throws Exception {
            try (ByteArrayOutputStream out = new ByteArrayOutputStream();){
                out.write(0);
                int schemaId = SchemaRegistryClientUtils.getSchemaRegistryClient().register(topic + "-value", (ParsedSchema)protobufSchema);
                out.write(Ints.toByteArray((int)schemaId));
                out.write(0);
                out.write(TestKafkaProtobufReadsSmokeTest.buildDynamicMessage(protobufSchema.toDescriptor(), values).toByteArray());
                byte[] byArray = out.toByteArray();
                return byArray;
            }
        }
    }

    private static final class ProtobufMessageSerializer
    implements MessageSerializer {
        private ProtobufMessageSerializer() {
        }

        @Override
        public byte[] serialize(String topic, ProtobufSchema protobufSchema, Map<String, Object> values) {
            return TestKafkaProtobufReadsSmokeTest.buildDynamicMessage(protobufSchema.toDescriptor(), values).toByteArray();
        }
    }
}

