/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.util.Arrays;
import java.util.Properties;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.streamer.SourceProfile;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class KafkaSource<T>
extends Source<T> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
    protected static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = "key.deserializer";
    protected static final String NATIVE_KAFKA_VALUE_DESERIALIZER_PROP = "value.deserializer";
    protected static final String METRIC_NAME_KAFKA_MESSAGE_IN_COUNT = "kafkaMessageInCount";
    protected final HoodieIngestionMetrics metrics;
    protected final SchemaProvider schemaProvider;
    protected KafkaOffsetGen offsetGen;
    protected final boolean shouldAddOffsets;

    protected KafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, Source.SourceType sourceType, HoodieIngestionMetrics metrics, StreamContext streamContext) {
        super(props, sparkContext, sparkSession, sourceType, streamContext);
        this.schemaProvider = streamContext.getSchemaProvider();
        this.metrics = metrics;
        this.shouldAddOffsets = KafkaOffsetPostProcessor.Config.shouldAddOffsets(props);
    }

    @Override
    protected final InputBatch<T> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
        throw new UnsupportedOperationException("KafkaSource#fetchNewData should not be called");
    }

    @Override
    protected InputBatch<T> readFromCheckpoint(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
        try {
            return this.toInputBatch(KafkaSource.getOffsetRanges(this.props, (Option<SourceProfileSupplier>)this.sourceProfileSupplier, this.offsetGen, this.metrics, lastCheckpoint, sourceLimit));
        }
        catch (TimeoutException e) {
            throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage());
        }
        catch (KafkaException ex) {
            if (this.hasConfigException(ex)) {
                throw new HoodieReadFromSourceException("kafka source config issue ", ex);
            }
            throw ex;
        }
    }

    public static OffsetRange[] getOffsetRanges(TypedProperties props, Option<SourceProfileSupplier> sourceProfileSupplier, KafkaOffsetGen offsetGen, HoodieIngestionMetrics metrics, Option<Checkpoint> lastCheckpoint, long sourceLimit) {
        Object[] offsetRanges;
        if (sourceProfileSupplier.isPresent() && ((SourceProfileSupplier)sourceProfileSupplier.get()).getSourceProfile() != null) {
            SourceProfile kafkaSourceProfile = ((SourceProfileSupplier)sourceProfileSupplier.get()).getSourceProfile();
            offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpoint, (Long)kafkaSourceProfile.getSourceSpecificContext(), kafkaSourceProfile.getSourcePartitions(), metrics);
            metrics.updateStreamerSourceParallelism(kafkaSourceProfile.getSourcePartitions());
            metrics.updateStreamerSourceBytesToBeIngestedInSyncRound(kafkaSourceProfile.getMaxSourceBytes());
            LOG.info("About to read maxEventsInSyncRound {} of size {} bytes in {} partitions from Kafka for topic {} with offsetRanges {}", new Object[]{kafkaSourceProfile.getSourceSpecificContext(), kafkaSourceProfile.getMaxSourceBytes(), kafkaSourceProfile.getSourcePartitions(), offsetGen.getTopicName(), offsetRanges});
        } else {
            int minPartitions = (int)ConfigUtils.getLongWithAltKeys((TypedProperties)props, KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
            metrics.updateStreamerSourceParallelism(minPartitions);
            offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpoint, sourceLimit, metrics);
            LOG.info("About to read sourceLimit {} in {} spark partitions from kafka for topic {} with offset ranges {}", new Object[]{sourceLimit, minPartitions, offsetGen.getTopicName(), Arrays.toString(offsetRanges)});
        }
        return offsetRanges;
    }

    private InputBatch<T> toInputBatch(OffsetRange[] offsetRanges) {
        long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
        LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + this.offsetGen.getTopicName());
        if (totalNewMsgs <= 0L) {
            this.metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, 0L);
            return new InputBatch(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
        }
        this.metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, totalNewMsgs);
        T newBatch = this.toBatch(offsetRanges);
        return new InputBatch(Option.of(newBatch), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
    }

    protected abstract T toBatch(OffsetRange[] var1);

    @Override
    public void onCommit(String lastCkptStr) {
        if (ConfigUtils.getBooleanWithAltKeys((Properties)this.props, KafkaSourceConfig.ENABLE_KAFKA_COMMIT_OFFSET)) {
            this.offsetGen.commitOffsetToKafka(lastCkptStr);
        }
    }

    private boolean hasConfigException(Throwable e) {
        if (e == null) {
            return false;
        }
        if (e instanceof ConfigException || e instanceof io.confluent.common.config.ConfigException) {
            return true;
        }
        return this.hasConfigException(e.getCause());
    }
}

