/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.source.kafka;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.spark.ISparkInput;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.metadata.streaming.StreamingManager;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.source.IReadableTable;
import org.apache.kylin.source.ISampleDataDeployer;
import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.kylin.source.SourcePartition;
import org.apache.kylin.source.hive.HiveMetadataExplorer;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.KafkaMRInput;
import org.apache.kylin.source.kafka.KafkaSparkInput;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
import org.apache.kylin.source.kafka.util.KafkaClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSource
implements ISource {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSource.class);

    public KafkaSource(KylinConfig config) {
    }

    public <I> I adaptToBuildEngine(Class<I> engineInterface) {
        if (engineInterface == IMRInput.class) {
            return (I)new KafkaMRInput();
        }
        if (engineInterface == ISparkInput.class) {
            return (I)new KafkaSparkInput();
        }
        throw new RuntimeException("Cannot adapt to " + engineInterface);
    }

    public IReadableTable createReadableTable(TableDesc tableDesc, String uuid) {
        throw new UnsupportedOperationException();
    }

    public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition) {
        this.checkSourceOffsets(srcPartition);
        SourcePartition result = SourcePartition.getCopyOf((SourcePartition)srcPartition);
        SegmentRange range = result.getSegRange();
        CubeInstance cube = (CubeInstance)buildable;
        if (range == null || range.start.v.equals(0L)) {
            CubeSegment last = cube.getLastSegment();
            if (last != null) {
                logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: " + last.getSourcePartitionOffsetEnd());
                result.setSourcePartitionOffsetStart(last.getSourcePartitionOffsetEnd());
            } else if (cube.getDescriptor().getPartitionOffsetStart() != null && cube.getDescriptor().getPartitionOffsetStart().size() > 0) {
                logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " + cube.getDescriptor().getPartitionOffsetStart());
                result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart());
            } else {
                logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset.");
                result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube));
            }
        }
        KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable());
        String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
        String topic = kafkaConfig.getTopic();
        Properties property = KafkaConsumerProperties.getInstanceFromEnv().extractKafkaConfigToProperties();
        KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), property);
        Object object = null;
        try {
            List partitionInfos = consumer.partitionsFor(topic);
            logger.info("Get {} partitions for topic {} ", (Object)partitionInfos.size(), (Object)topic);
            for (PartitionInfo partitionInfo : partitionInfos) {
                if (result.getSourcePartitionOffsetStart().containsKey(partitionInfo.partition())) continue;
                long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfo.partition());
                logger.debug("New partition {} added, with start offset {}", (Object)partitionInfo.partition(), (Object)earliest);
                result.getSourcePartitionOffsetStart().put(partitionInfo.partition(), earliest);
            }
        }
        catch (Throwable partitionInfos) {
            object = partitionInfos;
            throw partitionInfos;
        }
        finally {
            if (consumer != null) {
                if (object != null) {
                    try {
                        consumer.close();
                    }
                    catch (Throwable partitionInfos) {
                        ((Throwable)object).addSuppressed(partitionInfos);
                    }
                } else {
                    consumer.close();
                }
            }
        }
        if (range == null || range.end.v.equals(Long.MAX_VALUE)) {
            logger.debug("Seek end offsets from topic {}", (Object)topic);
            Map<Integer, Long> latestOffsets = KafkaClient.getLatestOffsets(cube);
            logger.debug("The end offsets are " + latestOffsets);
            for (Integer partitionId : latestOffsets.keySet()) {
                if (result.getSourcePartitionOffsetStart().containsKey(partitionId)) {
                    if ((Long)result.getSourcePartitionOffsetStart().get(partitionId) <= latestOffsets.get(partitionId)) continue;
                    throw new IllegalArgumentException("Partition " + partitionId + " end offset (" + latestOffsets.get(partitionId) + ") is smaller than start offset ( " + result.getSourcePartitionOffsetStart().get(partitionId) + ")");
                }
                throw new IllegalStateException("New partition added in between, retry.");
            }
            result.setSourcePartitionOffsetEnd(latestOffsets);
        }
        long totalStartOffset = 0L;
        long totalEndOffset = 0L;
        for (Long v : result.getSourcePartitionOffsetStart().values()) {
            totalStartOffset += v.longValue();
        }
        for (Long v : result.getSourcePartitionOffsetEnd().values()) {
            totalEndOffset += v.longValue();
        }
        if (totalStartOffset > totalEndOffset) {
            throw new IllegalArgumentException("Illegal offset: start: " + totalStartOffset + ", end: " + totalEndOffset);
        }
        if (totalStartOffset == totalEndOffset) {
            throw new IllegalArgumentException("No new message comes, startOffset = endOffset:" + totalStartOffset);
        }
        result.setSegRange(new SegmentRange((Comparable)Long.valueOf(totalStartOffset), (Comparable)Long.valueOf(totalEndOffset)));
        return result;
    }

    private void checkSourceOffsets(SourcePartition src) {
        long totalOffset;
        if (src.getSegRange() == null) {
            return;
        }
        long startOffset = (Long)src.getSegRange().start.v;
        long endOffset = (Long)src.getSegRange().end.v;
        Map sourcePartitionOffsetStart = src.getSourcePartitionOffsetStart();
        Map sourcePartitionOffsetEnd = src.getSourcePartitionOffsetEnd();
        if (endOffset <= 0L || startOffset >= endOffset) {
            throw new IllegalArgumentException("'startOffset' need be smaller than 'endOffset'");
        }
        if (startOffset > 0L) {
            if (sourcePartitionOffsetStart == null || sourcePartitionOffsetStart.size() == 0) {
                throw new IllegalArgumentException("When 'startOffset' is > 0, need provide each partition's start offset");
            }
            totalOffset = 0L;
            for (Long v : sourcePartitionOffsetStart.values()) {
                totalOffset += v.longValue();
            }
            if (totalOffset != startOffset) {
                throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
            }
        }
        if (endOffset > 0L && endOffset != Long.MAX_VALUE) {
            if (sourcePartitionOffsetEnd == null || sourcePartitionOffsetEnd.size() == 0) {
                throw new IllegalArgumentException("When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
            }
            totalOffset = 0L;
            for (Long v : sourcePartitionOffsetEnd.values()) {
                totalOffset += v.longValue();
            }
            if (totalOffset != endOffset) {
                throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
            }
        }
    }

    public ISourceMetadataExplorer getSourceMetadataExplorer() {
        return new ISourceMetadataExplorer(){

            public List<String> listDatabases() throws Exception {
                throw new UnsupportedOperationException();
            }

            public List<String> listTables(String database) throws Exception {
                throw new UnsupportedOperationException();
            }

            public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table, String prj) throws Exception {
                throw new UnsupportedOperationException();
            }

            public List<String> getRelatedKylinResources(TableDesc table) {
                ArrayList dependentResources = Lists.newArrayList();
                dependentResources.add(KafkaConfig.concatResourcePath(table.getIdentity()));
                if (table.getSourceType() == 1) {
                    dependentResources.add(StreamingConfig.concatResourcePath((String)table.getIdentity()));
                }
                return dependentResources;
            }

            public ColumnDesc[] evalQueryMetadata(String query) {
                throw new UnsupportedOperationException();
            }

            public void validateSQL(String query) throws Exception {
                throw new UnsupportedOperationException();
            }
        };
    }

    public ISampleDataDeployer getSampleDataDeployer() {
        return new HiveMetadataExplorer();
    }

    public void unloadTable(String tableName, String project) throws IOException {
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        StreamingManager streamingManager = StreamingManager.getInstance((KylinConfig)kylinConfig);
        KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
        StreamingConfig config = streamingManager.getStreamingConfig(tableName);
        KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(tableName);
        streamingManager.removeStreamingConfig(config);
        kafkaConfigManager.removeKafkaConfig(kafkaConfig);
    }

    public void close() throws IOException {
    }
}

