/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.data.input.kafkainput;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.data.input.kafkainput.KafkaHeaderReader;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;

public class KafkaInputReader
implements InputEntityReader {
    private static final Logger log = new Logger(KafkaInputReader.class);
    private final InputRowSchema inputRowSchema;
    private final SettableByteEntity<KafkaRecordEntity> source;
    private final Function<KafkaRecordEntity, KafkaHeaderReader> headerParserSupplier;
    private final Function<KafkaRecordEntity, InputEntityReader> keyParserSupplier;
    private final InputEntityReader valueParser;
    private final String keyColumnName;
    private final String timestampColumnName;

    public KafkaInputReader(InputRowSchema inputRowSchema, SettableByteEntity<KafkaRecordEntity> source, @Nullable Function<KafkaRecordEntity, KafkaHeaderReader> headerParserSupplier, @Nullable Function<KafkaRecordEntity, InputEntityReader> keyParserSupplier, InputEntityReader valueParser, String keyColumnName, String timestampColumnName) {
        this.inputRowSchema = inputRowSchema;
        this.source = source;
        this.headerParserSupplier = headerParserSupplier;
        this.keyParserSupplier = keyParserSupplier;
        this.valueParser = valueParser;
        this.keyColumnName = keyColumnName;
        this.timestampColumnName = timestampColumnName;
    }

    private List<String> getFinalDimensionList(HashSet<String> newDimensions) {
        List schemaDimensions = this.inputRowSchema.getDimensionsSpec().getDimensionNames();
        if (!schemaDimensions.isEmpty()) {
            return schemaDimensions;
        }
        return Lists.newArrayList((Iterable)Sets.difference(newDimensions, (Set)this.inputRowSchema.getDimensionsSpec().getDimensionExclusions()));
    }

    private CloseableIterator<InputRow> buildBlendedRows(InputEntityReader valueParser, Map<String, Object> headerKeyList) throws IOException {
        return valueParser.read().map(r -> {
            MapBasedInputRow valueRow;
            try {
                valueRow = (MapBasedInputRow)r;
            }
            catch (ClassCastException e) {
                throw new ParseException(null, "Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows", new Object[0]);
            }
            HashMap event = new HashMap(headerKeyList);
            event.putAll(valueRow.getEvent());
            HashSet<String> newDimensions = new HashSet<String>(valueRow.getDimensions());
            newDimensions.addAll(headerKeyList.keySet());
            newDimensions.remove("__kif_auto_timestamp");
            return new MapBasedInputRow(this.inputRowSchema.getTimestampSpec().extractTimestamp(event), this.getFinalDimensionList(newDimensions), event);
        });
    }

    private CloseableIterator<InputRow> buildRowsWithoutValuePayload(Map<String, Object> headerKeyList) {
        HashSet<String> newDimensions = new HashSet<String>(headerKeyList.keySet());
        MapBasedInputRow row = new MapBasedInputRow(this.inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList), this.getFinalDimensionList(newDimensions), headerKeyList);
        List<MapBasedInputRow> rows = Collections.singletonList(row);
        return CloseableIterators.withEmptyBaggage(rows.iterator());
    }

    public CloseableIterator<InputRow> read() throws IOException {
        InputEntityReader keyParser;
        KafkaRecordEntity record = (KafkaRecordEntity)this.source.getEntity();
        HashMap<String, Object> mergeMap = new HashMap<String, Object>();
        if (this.headerParserSupplier != null) {
            KafkaHeaderReader headerParser = this.headerParserSupplier.apply(record);
            List<Pair<String, Object>> headerList = headerParser.read();
            for (Pair<String, Object> ele : headerList) {
                mergeMap.put((String)ele.lhs, ele.rhs);
            }
        }
        mergeMap.putIfAbsent(this.timestampColumnName, record.getRecord().timestamp());
        InputEntityReader inputEntityReader = keyParser = this.keyParserSupplier == null ? null : this.keyParserSupplier.apply(record);
        if (keyParser != null) {
            try {
                CloseableIterator keyIterator = keyParser.read();
                Object object = null;
                try {
                    if (keyIterator.hasNext()) {
                        MapBasedInputRow keyRow = (MapBasedInputRow)keyIterator.next();
                        mergeMap.putIfAbsent(this.keyColumnName, ((Map.Entry)keyRow.getEvent().entrySet().stream().findFirst().get()).getValue());
                    }
                }
                catch (Throwable throwable) {
                    object = throwable;
                    throw throwable;
                }
                finally {
                    if (keyIterator != null) {
                        if (object != null) {
                            try {
                                keyIterator.close();
                            }
                            catch (Throwable throwable) {
                                ((Throwable)object).addSuppressed(throwable);
                            }
                        } else {
                            keyIterator.close();
                        }
                    }
                }
            }
            catch (ClassCastException e) {
                throw new IOException("Unsupported input format in keyFormat. KafkaInputformat only supports input format that return MapBasedInputRow rows");
            }
        }
        if (record.getRecord().value() != null) {
            return this.buildBlendedRows(this.valueParser, mergeMap);
        }
        return this.buildRowsWithoutValuePayload(mergeMap);
    }

    public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException {
        return this.read().map(row -> InputRowListPlusRawValues.of((InputRow)row, (Map)((MapBasedInputRow)row).getEvent()));
    }
}

