/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.sink;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
import org.apache.flink.connector.kafka.sink.HeaderProvider;
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSerializerWrapper;
import org.apache.flink.connector.kafka.sink.TopicSelector;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Serializer;
import org.apache.flink.shaded.guava31.com.google.common.reflect.TypeToken;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class KafkaRecordSerializationSchemaBuilder<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
    @Nullable
    private Function<? super IN, String> topicSelector;
    @Nullable
    private SerializationSchema<? super IN> valueSerializationSchema;
    @Nullable
    private KafkaPartitioner<? super IN> partitioner;
    @Nullable
    private SerializationSchema<? super IN> keySerializationSchema;
    @Nullable
    private HeaderProvider<? super IN> headerProvider;

    public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setPartitioner(FlinkKafkaPartitioner<? super T> partitioner) {
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.partitioner = (KafkaPartitioner)Preconditions.checkNotNull(partitioner);
        return self;
    }

    public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setPartitioner(KafkaPartitioner<? super T> partitioner) {
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.partitioner = (KafkaPartitioner)Preconditions.checkNotNull(partitioner);
        return self;
    }

    public KafkaRecordSerializationSchemaBuilder<IN> setTopic(String topic) {
        Preconditions.checkState((this.topicSelector == null ? 1 : 0) != 0, (Object)"Topic selector already set.");
        Preconditions.checkNotNull((Object)topic);
        this.topicSelector = new ConstantTopicSelector<IN>(topic);
        return this;
    }

    public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setTopicSelector(TopicSelector<? super T> topicSelector) {
        Preconditions.checkState((this.topicSelector == null ? 1 : 0) != 0, (Object)"Topic selector already set.");
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.topicSelector = new CachingTopicSelector<IN>((TopicSelector)Preconditions.checkNotNull(topicSelector));
        return self;
    }

    public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setKeySerializationSchema(SerializationSchema<? super T> keySerializationSchema) {
        this.checkKeySerializerNotSet();
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.keySerializationSchema = (SerializationSchema)Preconditions.checkNotNull(keySerializationSchema);
        return self;
    }

    public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setKafkaKeySerializer(Class<? extends Serializer<? super T>> keySerializer) {
        this.checkKeySerializerNotSet();
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.keySerializationSchema = new KafkaSerializerWrapper<IN>(keySerializer, true, this.topicSelector);
        return self;
    }

    public <T extends IN, S extends Serializer<? super T>> KafkaRecordSerializationSchemaBuilder<T> setKafkaKeySerializer(Class<S> keySerializer, Map<String, String> configuration) {
        this.checkKeySerializerNotSet();
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.keySerializationSchema = new KafkaSerializerWrapper<IN>(keySerializer, true, configuration, this.topicSelector);
        return self;
    }

    public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setValueSerializationSchema(SerializationSchema<T> valueSerializationSchema) {
        this.checkValueSerializerNotSet();
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.valueSerializationSchema = (SerializationSchema)Preconditions.checkNotNull(valueSerializationSchema);
        return self;
    }

    public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setHeaderProvider(HeaderProvider<? super T> headerProvider) {
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.headerProvider = (HeaderProvider)Preconditions.checkNotNull(headerProvider);
        return self;
    }

    private <T extends IN> KafkaRecordSerializationSchemaBuilder<T> self() {
        return this;
    }

    public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setKafkaValueSerializer(Class<? extends Serializer<? super T>> valueSerializer) {
        this.checkValueSerializerNotSet();
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.valueSerializationSchema = new KafkaSerializerWrapper<IN>(valueSerializer, false, this.topicSelector);
        return self;
    }

    public <T extends IN, S extends Serializer<? super T>> KafkaRecordSerializationSchemaBuilder<T> setKafkaValueSerializer(Class<S> valueSerializer, Map<String, String> configuration) {
        this.checkValueSerializerNotSet();
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.valueSerializationSchema = new KafkaSerializerWrapper<IN>(valueSerializer, false, configuration, this.topicSelector);
        return self;
    }

    public KafkaRecordSerializationSchema<IN> build() {
        Preconditions.checkState((this.valueSerializationSchema != null ? 1 : 0) != 0, (Object)"No value serializer is configured.");
        Preconditions.checkState((this.topicSelector != null ? 1 : 0) != 0, (Object)"No topic selector is configured.");
        return new KafkaRecordSerializationSchemaWrapper<IN>(this.topicSelector, this.valueSerializationSchema, this.keySerializationSchema, this.partitioner, this.headerProvider);
    }

    private void checkValueSerializerNotSet() {
        Preconditions.checkState((this.valueSerializationSchema == null ? 1 : 0) != 0, (Object)"Value serializer already set.");
    }

    private void checkKeySerializerNotSet() {
        Preconditions.checkState((this.keySerializationSchema == null ? 1 : 0) != 0, (Object)"Key serializer already set.");
    }

    private static class KafkaRecordSerializationSchemaWrapper<IN>
    implements KafkaDatasetFacetProvider,
    KafkaRecordSerializationSchema<IN>,
    TypeDatasetFacetProvider {
        private final SerializationSchema<? super IN> valueSerializationSchema;
        private final Function<? super IN, String> topicSelector;
        private final KafkaPartitioner<? super IN> partitioner;
        private final SerializationSchema<? super IN> keySerializationSchema;
        private final HeaderProvider<? super IN> headerProvider;

        KafkaRecordSerializationSchemaWrapper(Function<? super IN, String> topicSelector, SerializationSchema<? super IN> valueSerializationSchema, @Nullable SerializationSchema<? super IN> keySerializationSchema, @Nullable KafkaPartitioner<? super IN> partitioner, @Nullable HeaderProvider<? super IN> headerProvider) {
            this.topicSelector = (Function)Preconditions.checkNotNull(topicSelector);
            this.valueSerializationSchema = (SerializationSchema)Preconditions.checkNotNull(valueSerializationSchema);
            this.partitioner = partitioner;
            this.keySerializationSchema = keySerializationSchema;
            this.headerProvider = headerProvider;
        }

        @Override
        public void open(SerializationSchema.InitializationContext context, KafkaRecordSerializationSchema.KafkaSinkContext sinkContext) throws Exception {
            this.valueSerializationSchema.open(context);
            if (this.keySerializationSchema != null) {
                this.keySerializationSchema.open(context);
            }
            if (this.partitioner != null) {
                this.partitioner.open(sinkContext.getParallelInstanceId(), sinkContext.getNumberOfParallelInstances());
            }
        }

        @Override
        public ProducerRecord<byte[], byte[]> serialize(IN element, KafkaRecordSerializationSchema.KafkaSinkContext context, Long timestamp) {
            String targetTopic = this.topicSelector.apply(element);
            byte[] value = this.valueSerializationSchema.serialize(element);
            byte[] key = null;
            if (this.keySerializationSchema != null) {
                key = this.keySerializationSchema.serialize(element);
            }
            OptionalInt partition = this.partitioner != null ? OptionalInt.of(this.partitioner.partition(element, key, value, targetTopic, context.getPartitionsForTopic(targetTopic))) : OptionalInt.empty();
            return new ProducerRecord<byte[], byte[]>(targetTopic, partition.isPresent() ? Integer.valueOf(partition.getAsInt()) : null, timestamp == null || timestamp < 0L ? null : timestamp, key, value, this.headerProvider != null ? this.headerProvider.getHeaders(element) : null);
        }

        @Override
        public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
            if (!(this.topicSelector instanceof KafkaDatasetIdentifierProvider)) {
                LOG.info("Cannot identify topics. Not an TopicsIdentifierProvider");
                return Optional.empty();
            }
            Optional<DefaultKafkaDatasetIdentifier> topicsIdentifier = ((KafkaDatasetIdentifierProvider)((Object)this.topicSelector)).getDatasetIdentifier();
            if (!topicsIdentifier.isPresent()) {
                LOG.info("No topics' identifiers provided");
                return Optional.empty();
            }
            return Optional.of(new DefaultKafkaDatasetFacet(topicsIdentifier.get()));
        }

        @Override
        public Optional<TypeDatasetFacet> getTypeDatasetFacet() {
            if (this.valueSerializationSchema instanceof ResultTypeQueryable) {
                return Optional.of(new DefaultTypeDatasetFacet(((ResultTypeQueryable)this.valueSerializationSchema).getProducedType()));
            }
            TypeToken serializationSchemaType = TypeToken.of(this.valueSerializationSchema.getClass());
            Class parameterType = serializationSchemaType.resolveType(SerializationSchema.class.getTypeParameters()[0]).getRawType();
            if (parameterType != Object.class) {
                return Optional.of(new DefaultTypeDatasetFacet(TypeInformation.of((Class)parameterType)));
            }
            return Optional.empty();
        }
    }

    private static class CachingTopicSelector<IN>
    implements Function<IN, String>,
    KafkaDatasetIdentifierProvider,
    Serializable {
        private static final int CACHE_RESET_SIZE = 5;
        private final Map<IN, String> cache;
        private final TopicSelector<IN> topicSelector;

        CachingTopicSelector(TopicSelector<IN> topicSelector) {
            this.topicSelector = topicSelector;
            this.cache = new HashMap<IN, String>();
        }

        @Override
        public String apply(IN in) {
            String topic = this.cache.getOrDefault(in, (String)this.topicSelector.apply(in));
            this.cache.put(in, topic);
            if (this.cache.size() >= 5) {
                this.cache.clear();
            }
            return topic;
        }

        @Override
        public Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier() {
            if (this.topicSelector instanceof KafkaDatasetIdentifierProvider) {
                return ((KafkaDatasetIdentifierProvider)((Object)this.topicSelector)).getDatasetIdentifier();
            }
            return Optional.empty();
        }
    }

    private static class ConstantTopicSelector<IN>
    implements Function<IN, String>,
    Serializable,
    KafkaDatasetIdentifierProvider {
        private String topic;

        ConstantTopicSelector(String topic) {
            this.topic = topic;
        }

        @Override
        public String apply(IN in) {
            return this.topic;
        }

        @Override
        public Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier() {
            return Optional.of(DefaultKafkaDatasetIdentifier.ofTopics(Collections.singletonList(this.topic)));
        }
    }
}

