/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.iceberg.BaseScan;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
import org.apache.iceberg.InternalData;
import org.apache.iceberg.ManifestEntry;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.PartitionStats;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionMap;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionStatsHandler {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionStatsHandler.class);
    public static final int PARTITION_FIELD_ID = 1;
    public static final String PARTITION_FIELD_NAME = "partition";
    public static final Types.NestedField SPEC_ID = Types.NestedField.required(2, "spec_id", Types.IntegerType.get());
    public static final Types.NestedField DATA_RECORD_COUNT = Types.NestedField.required(3, "data_record_count", Types.LongType.get());
    public static final Types.NestedField DATA_FILE_COUNT = Types.NestedField.required(4, "data_file_count", Types.IntegerType.get());
    public static final Types.NestedField TOTAL_DATA_FILE_SIZE_IN_BYTES = Types.NestedField.required(5, "total_data_file_size_in_bytes", Types.LongType.get());
    public static final Types.NestedField POSITION_DELETE_RECORD_COUNT = Types.NestedField.optional(6, "position_delete_record_count", Types.LongType.get());
    public static final Types.NestedField POSITION_DELETE_FILE_COUNT = Types.NestedField.optional(7, "position_delete_file_count", Types.IntegerType.get());
    public static final Types.NestedField EQUALITY_DELETE_RECORD_COUNT = Types.NestedField.optional(8, "equality_delete_record_count", Types.LongType.get());
    public static final Types.NestedField EQUALITY_DELETE_FILE_COUNT = Types.NestedField.optional(9, "equality_delete_file_count", Types.IntegerType.get());
    public static final Types.NestedField TOTAL_RECORD_COUNT = Types.NestedField.optional(10, "total_record_count", Types.LongType.get());
    public static final Types.NestedField LAST_UPDATED_AT = Types.NestedField.optional(11, "last_updated_at", Types.LongType.get());
    public static final Types.NestedField LAST_UPDATED_SNAPSHOT_ID = Types.NestedField.optional(12, "last_updated_snapshot_id", Types.LongType.get());
    public static final Types.NestedField DV_COUNT = Types.NestedField.required("dv_count").withId(13).ofType(Types.IntegerType.get()).withInitialDefault(Literal.of(0)).withWriteDefault(Literal.of(0)).build();

    private PartitionStatsHandler() {
    }

    @Deprecated
    public static Schema schema(Types.StructType unifiedPartitionType) {
        Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
        return PartitionStatsHandler.v2Schema(unifiedPartitionType);
    }

    public static Schema schema(Types.StructType unifiedPartitionType, int formatVersion) {
        Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
        Preconditions.checkState(formatVersion > 0 && formatVersion <= 4, "Invalid format version: %d", formatVersion);
        if (formatVersion <= 2) {
            return PartitionStatsHandler.v2Schema(unifiedPartitionType);
        }
        return PartitionStatsHandler.v3Schema(unifiedPartitionType);
    }

    private static Schema v2Schema(Types.StructType unifiedPartitionType) {
        return new Schema(Types.NestedField.required(1, PARTITION_FIELD_NAME, unifiedPartitionType), SPEC_ID, DATA_RECORD_COUNT, DATA_FILE_COUNT, TOTAL_DATA_FILE_SIZE_IN_BYTES, POSITION_DELETE_RECORD_COUNT, POSITION_DELETE_FILE_COUNT, EQUALITY_DELETE_RECORD_COUNT, EQUALITY_DELETE_FILE_COUNT, TOTAL_RECORD_COUNT, LAST_UPDATED_AT, LAST_UPDATED_SNAPSHOT_ID);
    }

    private static Schema v3Schema(Types.StructType unifiedPartitionType) {
        return new Schema(Types.NestedField.required(1, PARTITION_FIELD_NAME, unifiedPartitionType), SPEC_ID, DATA_RECORD_COUNT, DATA_FILE_COUNT, TOTAL_DATA_FILE_SIZE_IN_BYTES, Types.NestedField.required(POSITION_DELETE_RECORD_COUNT.fieldId(), POSITION_DELETE_RECORD_COUNT.name(), Types.LongType.get()), Types.NestedField.required(POSITION_DELETE_FILE_COUNT.fieldId(), POSITION_DELETE_FILE_COUNT.name(), Types.IntegerType.get()), Types.NestedField.required(EQUALITY_DELETE_RECORD_COUNT.fieldId(), EQUALITY_DELETE_RECORD_COUNT.name(), Types.LongType.get()), Types.NestedField.required(EQUALITY_DELETE_FILE_COUNT.fieldId(), EQUALITY_DELETE_FILE_COUNT.name(), Types.IntegerType.get()), TOTAL_RECORD_COUNT, LAST_UPDATED_AT, LAST_UPDATED_SNAPSHOT_ID, DV_COUNT);
    }

    public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) throws IOException {
        Preconditions.checkArgument(table != null, "Invalid table: null");
        if (table.currentSnapshot() == null) {
            return null;
        }
        return PartitionStatsHandler.computeAndWriteStatsFile(table, table.currentSnapshot().snapshotId());
    }

    public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long snapshotId) throws IOException {
        Collection<PartitionStats> stats;
        Preconditions.checkArgument(table != null, "Invalid table: null");
        Preconditions.checkArgument(Partitioning.isPartitioned(table), "Table must be partitioned");
        Snapshot snapshot = table.snapshot(snapshotId);
        Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", snapshotId);
        Types.StructType partitionType = Partitioning.partitionType(table);
        PartitionStatisticsFile statisticsFile = PartitionStatsHandler.latestStatsFile(table, snapshot.snapshotId());
        if (statisticsFile == null) {
            LOG.info("Using full compute as previous statistics file is not present for incremental compute.");
            stats = PartitionStatsHandler.computeStats(table, snapshot.allManifests(table.io()), false).values();
        } else {
            if (statisticsFile.snapshotId() == snapshotId) {
                LOG.info("Returning existing statistics file for snapshot {}", (Object)snapshotId);
                return statisticsFile;
            }
            try {
                stats = PartitionStatsHandler.computeAndMergeStatsIncremental(table, snapshot, partitionType, statisticsFile);
            }
            catch (InvalidStatsFileException exception) {
                LOG.warn("Using full compute as previous statistics file is corrupted for incremental compute.");
                stats = PartitionStatsHandler.computeStats(table, snapshot.allManifests(table.io()), false).values();
            }
        }
        if (stats.isEmpty()) {
            return null;
        }
        List<PartitionStats> sortedStats = PartitionStatsHandler.sortStatsByPartition(stats, partitionType);
        return PartitionStatsHandler.writePartitionStatsFile(table, snapshot.snapshotId(), PartitionStatsHandler.schema(partitionType, TableUtil.formatVersion(table)), sortedStats);
    }

    @VisibleForTesting
    static PartitionStatisticsFile writePartitionStatsFile(Table table, long snapshotId, Schema dataSchema, Iterable<PartitionStats> records) throws IOException {
        FileFormat fileFormat = FileFormat.fromString(table.properties().getOrDefault("write.format.default", "parquet"));
        OutputFile outputFile = PartitionStatsHandler.newPartitionStatsFile(table, fileFormat, snapshotId);
        try (FileAppender writer = InternalData.write(fileFormat, outputFile).schema(dataSchema).build();){
            records.iterator().forEachRemaining(writer::add);
        }
        return ImmutableGenericPartitionStatisticsFile.builder().snapshotId(snapshotId).path(outputFile.location()).fileSizeInBytes(outputFile.toInputFile().getLength()).build();
    }

    public static CloseableIterable<PartitionStats> readPartitionStatsFile(Schema schema, InputFile inputFile) {
        Preconditions.checkArgument(schema != null, "Invalid schema: null");
        Preconditions.checkArgument(inputFile != null, "Invalid input file: null");
        FileFormat fileFormat = FileFormat.fromFileName(inputFile.location());
        Preconditions.checkArgument(fileFormat != null, "Unable to determine format of file: %s", (Object)inputFile.location());
        CloseableIterable records = InternalData.read(fileFormat, inputFile).project(schema).build();
        return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStats);
    }

    private static OutputFile newPartitionStatsFile(Table table, FileFormat fileFormat, long snapshotId) {
        Preconditions.checkArgument(table instanceof HasTableOperations, "Table must have operations to retrieve metadata location");
        return table.io().newOutputFile(((HasTableOperations)((Object)table)).operations().metadataFileLocation(fileFormat.addExtension(String.format(Locale.ROOT, "partition-stats-%d-%s", snapshotId, UUID.randomUUID()))));
    }

    private static PartitionStats recordToPartitionStats(StructLike record) {
        int pos = 0;
        PartitionStats stats = new PartitionStats(record.get(pos++, StructLike.class), record.get(pos++, Integer.class));
        while (pos < record.size()) {
            stats.set(pos, record.get(pos, Object.class));
            ++pos;
        }
        return stats;
    }

    private static Collection<PartitionStats> computeAndMergeStatsIncremental(Table table, Snapshot snapshot, Types.StructType partitionType, PartitionStatisticsFile previousStatsFile) {
        PartitionMap statsMap = PartitionMap.create(table.specs());
        try (CloseableIterable<PartitionStats> oldStats = PartitionStatsHandler.readPartitionStatsFile(PartitionStatsHandler.schema(partitionType, TableUtil.formatVersion(table)), table.io().newInputFile(previousStatsFile.path()));){
            oldStats.forEach(partitionStats -> statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats));
        }
        catch (Exception exception) {
            throw new InvalidStatsFileException(exception);
        }
        PartitionMap<PartitionStats> incrementalStatsMap = PartitionStatsHandler.computeStatsDiff(table, table.snapshot(previousStatsFile.snapshotId()), snapshot);
        incrementalStatsMap.forEach((key, value) -> statsMap.merge(Pair.of((Integer)key.first(), PartitionStatsHandler.partitionDataToRecord((PartitionData)key.second())), value, (existingEntry, newEntry) -> {
            existingEntry.appendStats((PartitionStats)newEntry);
            return existingEntry;
        }));
        return statsMap.values();
    }

    private static GenericRecord partitionDataToRecord(PartitionData data) {
        GenericRecord record = GenericRecord.create(data.getPartitionType());
        for (int index = 0; index < record.size(); ++index) {
            record.set(index, data.get(index));
        }
        return record;
    }

    @VisibleForTesting
    static PartitionStatisticsFile latestStatsFile(Table table, long snapshotId) {
        List<PartitionStatisticsFile> partitionStatisticsFiles = table.partitionStatisticsFiles();
        if (partitionStatisticsFiles.isEmpty()) {
            return null;
        }
        Map<Long, PartitionStatisticsFile> stats = partitionStatisticsFiles.stream().collect(Collectors.toMap(PartitionStatisticsFile::snapshotId, file -> file));
        for (Snapshot snapshot : SnapshotUtil.ancestorsOf(snapshotId, table::snapshot)) {
            if (!stats.containsKey(snapshot.snapshotId())) continue;
            return stats.get(snapshot.snapshotId());
        }
        return null;
    }

    private static PartitionMap<PartitionStats> computeStatsDiff(Table table, Snapshot fromSnapshot, Snapshot toSnapshot) {
        Iterable<Snapshot> snapshots = SnapshotUtil.ancestorsBetween(toSnapshot.snapshotId(), (Long)fromSnapshot.snapshotId(), table::snapshot);
        List<ManifestFile> manifests = StreamSupport.stream(snapshots.spliterator(), false).flatMap(snapshot -> snapshot.allManifests(table.io()).stream().filter(file -> file.snapshotId().equals(snapshot.snapshotId()))).collect(Collectors.toList());
        return PartitionStatsHandler.computeStats(table, manifests, true);
    }

    private static PartitionMap<PartitionStats> computeStats(Table table, List<ManifestFile> manifests, boolean incremental) {
        Types.StructType partitionType = Partitioning.partitionType(table);
        ConcurrentLinkedQueue<PartitionMap> statsByManifest = Queues.newConcurrentLinkedQueue();
        Tasks.foreach(manifests).stopOnFailure().throwFailureWhenFinished().executeWith(ThreadPools.getWorkerPool()).run(manifest -> statsByManifest.add(PartitionStatsHandler.collectStatsForManifest(table, manifest, partitionType, incremental)));
        PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
        for (PartitionMap stats : statsByManifest) {
            PartitionStatsHandler.mergePartitionMap(stats, statsMap);
        }
        return statsMap;
    }

    private static PartitionMap<PartitionStats> collectStatsForManifest(Table table, ManifestFile manifest, Types.StructType partitionType, boolean incremental) {
        PartitionMap partitionMap;
        block11: {
            List<String> projection = BaseScan.scanColumns(manifest.content());
            ManifestReader<?> reader = ManifestFiles.open(manifest, table.io()).select(projection);
            try {
                PartitionMap statsMap = PartitionMap.create(table.specs());
                int specId = manifest.partitionSpecId();
                PartitionSpec spec = table.specs().get(specId);
                PartitionData keyTemplate = new PartitionData(partitionType);
                for (ManifestEntry manifestEntry : reader.entries()) {
                    Object file = manifestEntry.file();
                    StructLike coercedPartition = PartitionUtil.coercePartition(partitionType, spec, file.partition());
                    PartitionData key = keyTemplate.copyFor(coercedPartition);
                    Snapshot snapshot = table.snapshot(manifestEntry.snapshotId());
                    PartitionStats stats = statsMap.computeIfAbsent(specId, ((PartitionData)file.partition()).copy(), () -> new PartitionStats(key, specId));
                    if (manifestEntry.isLive()) {
                        if (incremental && manifestEntry.status() != ManifestEntry.Status.ADDED) continue;
                        stats.liveEntry((ContentFile<?>)file, snapshot);
                        continue;
                    }
                    if (incremental) {
                        stats.deletedEntryForIncrementalCompute((ContentFile<?>)file, snapshot);
                        continue;
                    }
                    stats.deletedEntry(snapshot);
                }
                partitionMap = statsMap;
                if (reader == null) break block11;
            }
            catch (Throwable throwable) {
                try {
                    if (reader != null) {
                        try {
                            reader.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
            reader.close();
        }
        return partitionMap;
    }

    private static void mergePartitionMap(PartitionMap<PartitionStats> fromMap, PartitionMap<PartitionStats> toMap) {
        fromMap.forEach((key, value) -> toMap.merge(key, (PartitionStats)value, (existingEntry, newEntry) -> {
            existingEntry.appendStats((PartitionStats)newEntry);
            return existingEntry;
        }));
    }

    private static List<PartitionStats> sortStatsByPartition(Collection<PartitionStats> stats, Types.StructType partitionType) {
        ArrayList<PartitionStats> entries = Lists.newArrayList(stats);
        entries.sort(Comparator.comparing(PartitionStats::partition, Comparators.forType(partitionType)));
        return entries;
    }

    private static class InvalidStatsFileException
    extends RuntimeException {
        InvalidStatsFileException(Throwable cause) {
            super(cause);
        }
    }
}

