/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.schema.compatibility;

import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.schema.Schemas;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class SchemaTypeCompatibilityCheckTest
extends MockedPulsarServiceBaseTest {
    private static final String CLUSTER_NAME = "test";
    private static final String PUBLIC_TENANT = "public";
    private static final String namespace = "test-namespace";
    private static final String namespaceName = "public/test-namespace";

    @Override
    @BeforeClass
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        TenantInfo tenantInfo = TenantInfo.builder().allowedClusters(Collections.singleton(CLUSTER_NAME)).build();
        this.admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
        this.admin.namespaces().createNamespace(namespaceName, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
    }

    @Override
    @AfterClass(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSchemaCompatibilityStrategyInBrokerLevel() throws PulsarClientException {
        this.conf.setSchemaCompatibilityStrategy(SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
        String topicName = TopicName.get((String)TopicDomain.persistent.value(), (String)PUBLIC_TENANT, (String)namespace, (String)"testSchemaCompatibilityStrategyInBrokerLevel").toString();
        this.pulsarClient.newProducer(Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build())).topic(topicName).create();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonThree.class).build())).topic(topicName);
        Throwable t = Assert.expectThrows(PulsarClientException.IncompatibleSchemaException.class, () -> ((ProducerBuilder)producerBuilder).create());
        Assert.assertTrue((boolean)t.getMessage().contains("org.apache.avro.SchemaValidationException: Unable to read schema"));
    }

    @Test
    public void structTypeProducerProducerUndefinedCompatible() throws Exception {
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);
        String topicName = TopicName.get((String)TopicDomain.persistent.value(), (String)PUBLIC_TENANT, (String)namespace, (String)"structTypeProducerProducerUndefinedCompatible").toString();
        this.pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class)).topic(topicName).create();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)).topic(topicName);
        Throwable t = Assert.expectThrows(PulsarClientException.IncompatibleSchemaException.class, () -> ((ProducerBuilder)producerBuilder).create());
        Assert.assertTrue((boolean)t.getMessage().endsWith("Incompatible schema: exists schema type JSON, new schema type AVRO"));
    }

    @Test
    public void structTypeProducerConsumerUndefinedCompatible() throws Exception {
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);
        String subName = "my-sub";
        String topicName = TopicName.get((String)TopicDomain.persistent.value(), (String)PUBLIC_TENANT, (String)namespace, (String)"structTypeProducerConsumerUndefinedCompatible").toString();
        this.pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class)).topic(topicName).create();
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class)).topic(new String[]{topicName}).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscriptionName("my-sub");
        Throwable t = Assert.expectThrows(PulsarClientException.IncompatibleSchemaException.class, () -> ((ConsumerBuilder)consumerBuilder).subscribe());
        Assert.assertTrue((boolean)t.getMessage().endsWith("Incompatible schema: exists schema type JSON, new schema type AVRO"));
    }

    @Test
    public void structTypeConsumerProducerUndefinedCompatible() throws Exception {
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);
        String subName = "my-sub";
        String topicName = TopicName.get((String)TopicDomain.persistent.value(), (String)PUBLIC_TENANT, (String)namespace, (String)"structTypeConsumerProducerUndefinedCompatible").toString();
        this.pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class)).topic(new String[]{topicName}).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscriptionName("my-sub").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)).topic(topicName);
        Throwable t = Assert.expectThrows(PulsarClientException.IncompatibleSchemaException.class, () -> ((ProducerBuilder)producerBuilder).create());
        Assert.assertTrue((boolean)t.getMessage().endsWith("Incompatible schema: exists schema type JSON, new schema type AVRO"));
    }

    @Test
    public void structTypeConsumerConsumerUndefinedCompatible() throws Exception {
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);
        String subName = "my-sub";
        String topicName = TopicName.get((String)TopicDomain.persistent.value(), (String)PUBLIC_TENANT, (String)namespace, (String)"structTypeConsumerConsumerUndefinedCompatible").toString();
        this.pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class)).topic(new String[]{topicName}).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscriptionName("my-sub1").subscribe();
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class)).topic(new String[]{topicName}).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscriptionName("my-sub2");
        Throwable t = Assert.expectThrows(PulsarClientException.IncompatibleSchemaException.class, () -> ((ConsumerBuilder)consumerBuilder).subscribe());
        Assert.assertTrue((boolean)t.getMessage().endsWith("Incompatible schema: exists schema type JSON, new schema type AVRO"));
    }

    @Test
    public void primitiveTypeProducerProducerUndefinedCompatible() throws Exception {
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);
        String topicName = TopicName.get((String)TopicDomain.persistent.value(), (String)PUBLIC_TENANT, (String)namespace, (String)"primitiveTypeProducerProducerUndefinedCompatible").toString();
        this.pulsarClient.newProducer(Schema.INT32).topic(topicName).create();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.STRING).topic(topicName);
        Throwable t = Assert.expectThrows(PulsarClientException.IncompatibleSchemaException.class, () -> ((ProducerBuilder)producerBuilder).create());
        Assert.assertTrue((boolean)t.getMessage().endsWith("Incompatible schema: exists schema type INT32, new schema type STRING"));
    }

    @Test
    public void primitiveTypeProducerConsumerUndefinedCompatible() throws Exception {
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);
        String subName = "my-sub";
        String topicName = TopicName.get((String)TopicDomain.persistent.value(), (String)PUBLIC_TENANT, (String)namespace, (String)"primitiveTypeProducerConsumerUndefinedCompatible").toString();
        this.pulsarClient.newProducer(Schema.INT32).topic(topicName).create();
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscriptionName("my-sub");
        Throwable t = Assert.expectThrows(PulsarClientException.IncompatibleSchemaException.class, () -> ((ConsumerBuilder)consumerBuilder).subscribe());
        Assert.assertTrue((boolean)t.getMessage().endsWith("Incompatible schema: exists schema type INT32, new schema type STRING"));
    }

    @Test
    public void primitiveTypeConsumerProducerUndefinedCompatible() throws Exception {
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);
        String subName = "my-sub";
        String topicName = TopicName.get((String)TopicDomain.persistent.value(), (String)PUBLIC_TENANT, (String)namespace, (String)"primitiveTypeConsumerProducerUndefinedCompatible").toString();
        this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{topicName}).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscriptionName("my-sub").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.STRING).topic(topicName);
        Throwable t = Assert.expectThrows(PulsarClientException.IncompatibleSchemaException.class, () -> ((ProducerBuilder)producerBuilder).create());
        Assert.assertTrue((boolean)t.getMessage().endsWith("Incompatible schema: exists schema type INT32, new schema type STRING"));
    }

    @Test
    public void primitiveTypeConsumerConsumerUndefinedCompatible() throws Exception {
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);
        String subName = "my-sub";
        String topicName = TopicName.get((String)TopicDomain.persistent.value(), (String)PUBLIC_TENANT, (String)namespace, (String)"primitiveTypeConsumerConsumerUndefinedCompatible").toString();
        this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{topicName}).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscriptionName("my-sub1").subscribe();
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscriptionName("my-sub2");
        Throwable t = Assert.expectThrows(PulsarClientException.IncompatibleSchemaException.class, () -> ((ConsumerBuilder)consumerBuilder).subscribe());
        Assert.assertTrue((boolean)t.getMessage().endsWith("Incompatible schema: exists schema type INT32, new schema type STRING"));
    }

    @Test
    public void testAlwaysCompatible() throws Exception {
        Schema[] schemas;
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
        String topicName = TopicName.get((String)TopicDomain.persistent.value(), (String)PUBLIC_TENANT, (String)namespace, (String)("testAlwaysCompatible" + UUID.randomUUID().toString())).toString();
        for (Schema schema : schemas = new Schema[]{Schema.AVRO(Schemas.PersonOne.class), Schema.AVRO(Schemas.PersonFour.class), Schema.JSON(Schemas.PersonOne.class), Schema.JSON(Schemas.PersonFour.class), Schema.INT8, Schema.INT16, Schema.INT32, Schema.INT64, Schema.DATE, Schema.BOOL, Schema.DOUBLE, Schema.STRING, Schema.BYTES, Schema.FLOAT, Schema.INSTANT, Schema.BYTEBUFFER, Schema.TIME, Schema.TIMESTAMP, Schema.LOCAL_DATE, Schema.LOCAL_DATE_TIME, Schema.LOCAL_TIME}) {
            Producer p = this.pulsarClient.newProducer(schema).topic(topicName).create();
            p.close();
        }
        for (Schema schema : schemas) {
            Consumer c = this.pulsarClient.newConsumer(schema).topic(new String[]{topicName}).subscriptionName(UUID.randomUUID().toString()).subscribe();
            c.close();
        }
        List schemasOfTopic = this.admin.schemas().getAllSchemas(topicName);
        Assert.assertEquals((int)schemasOfTopic.size(), (int)(schemas.length - 2));
        for (Schema schema : schemas) {
            Producer p = this.pulsarClient.newProducer(schema).topic(topicName).create();
            p.close();
        }
        Assert.assertEquals((int)schemasOfTopic.size(), (int)(schemas.length - 2));
    }
}

