/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.iceberg;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.DateType;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.TimestampWithTimeZoneType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeUtils;
import com.facebook.presto.common.type.VarbinaryType;
import com.facebook.presto.common.type.Varchars;
import com.facebook.presto.hive.BaseHiveColumnHandle;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.iceberg.ExpressionConverter;
import com.facebook.presto.iceberg.IcebergColumnHandle;
import com.facebook.presto.iceberg.IcebergErrorCode;
import com.facebook.presto.iceberg.IcebergSessionProperties;
import com.facebook.presto.iceberg.IcebergTableHandle;
import com.facebook.presto.iceberg.IcebergTableType;
import com.facebook.presto.iceberg.IcebergUtil;
import com.facebook.presto.iceberg.Partition;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.statistics.ColumnStatisticMetadata;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.facebook.presto.spi.statistics.DoubleRange;
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.CompactSketch;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

public class TableStatisticsMaker {
    private static final Logger log = Logger.get(TableStatisticsMaker.class);
    private static final String ICEBERG_THETA_SKETCH_BLOB_TYPE_ID = "apache-datasketches-theta-v1";
    private static final String ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY = "ndv";
    private final Table icebergTable;
    private final ConnectorSession session;

    private TableStatisticsMaker(Table icebergTable, ConnectorSession session) {
        this.icebergTable = icebergTable;
        this.session = session;
    }

    public static TableStatistics getTableStatistics(ConnectorSession session, Constraint constraint, IcebergTableHandle tableHandle, Table icebergTable, List<IcebergColumnHandle> columns) {
        return new TableStatisticsMaker(icebergTable, session).makeTableStatistics(tableHandle, constraint, columns);
    }

    private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Constraint constraint, List<IcebergColumnHandle> selectedColumns) {
        if (!tableHandle.getIcebergTableName().getSnapshotId().isPresent() || constraint.getSummary().isNone()) {
            return TableStatistics.builder().setRowCount(Estimate.of((double)0.0)).build();
        }
        TupleDomain intersection = constraint.getSummary().transform(IcebergColumnHandle.class::cast).intersect(tableHandle.getPredicate());
        if (intersection.isNone()) {
            return TableStatistics.builder().setRowCount(Estimate.of((double)0.0)).build();
        }
        List columns = this.icebergTable.schema().columns();
        Map<Integer, Type.PrimitiveType> idToTypeMapping = columns.stream().filter(column -> column.type().isPrimitiveType()).collect(Collectors.toMap(Types.NestedField::fieldId, column -> column.type().asPrimitiveType()));
        List partitionFields = this.icebergTable.spec().fields();
        Set identityPartitionIds = IcebergUtil.getIdentityPartitions(this.icebergTable.spec()).keySet().stream().map(PartitionField::sourceId).collect(Collectors.toSet());
        List nonPartitionPrimitiveColumns = (List)columns.stream().filter(column -> !identityPartitionIds.contains(column.fieldId()) && column.type().isPrimitiveType()).collect(ImmutableList.toImmutableList());
        Partition summary = tableHandle.getIcebergTableName().getTableType() == IcebergTableType.EQUALITY_DELETES ? this.getEqualityDeleteTableSummary(tableHandle, (TupleDomain<IcebergColumnHandle>)intersection, idToTypeMapping, nonPartitionPrimitiveColumns, partitionFields) : this.getDataTableSummary(tableHandle, selectedColumns, (TupleDomain<IcebergColumnHandle>)intersection, idToTypeMapping, nonPartitionPrimitiveColumns, partitionFields);
        if (summary == null) {
            return TableStatistics.builder().setRowCount(Estimate.of((double)0.0)).build();
        }
        double recordCount = summary.getRecordCount();
        TableStatistics.Builder result = TableStatistics.builder();
        result.setRowCount(Estimate.of((double)recordCount));
        result.setTotalSize(Estimate.of((double)summary.getSize()));
        Map tableStats = this.getClosestStatisticsFileForSnapshot(tableHandle).map(TableStatisticsMaker::loadStatisticsFile).orElseGet(Collections::emptyMap);
        for (IcebergColumnHandle columnHandle : selectedColumns) {
            Long columnSize;
            int fieldId = columnHandle.getId();
            ColumnStatistics.Builder columnBuilder = tableStats.getOrDefault(fieldId, ColumnStatistics.builder());
            Long nullCount = summary.getNullCounts().get(fieldId);
            if (nullCount != null) {
                columnBuilder.setNullsFraction(Estimate.of((double)((double)nullCount.longValue() / recordCount)));
            }
            if (summary.getColumnSizes() != null && (columnSize = summary.getColumnSizes().get(fieldId)) != null) {
                columnBuilder.setDataSize(Estimate.of((double)columnSize.longValue()));
            }
            Object min = summary.getMinValues().get(fieldId);
            Object max = summary.getMaxValues().get(fieldId);
            if (min instanceof Number && max instanceof Number) {
                columnBuilder.setRange(Optional.of(new DoubleRange(((Number)min).doubleValue(), ((Number)max).doubleValue())));
            }
            result.setColumnStatistics((ColumnHandle)columnHandle, columnBuilder.build());
        }
        return result.build();
    }

    private Partition getDataTableSummary(IcebergTableHandle tableHandle, List<IcebergColumnHandle> selectedColumns, TupleDomain<IcebergColumnHandle> intersection, Map<Integer, Type.PrimitiveType> idToTypeMapping, List<Types.NestedField> nonPartitionPrimitiveColumns, List<PartitionField> partitionFields) {
        TableScan tableScan = (TableScan)((TableScan)((TableScan)this.icebergTable.newScan().filter(ExpressionConverter.toIcebergExpression(intersection))).select((Collection)selectedColumns.stream().map(BaseHiveColumnHandle::getName).collect(Collectors.toList()))).useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get().longValue()).includeColumnStats();
        CloseableIterable files = CloseableIterable.transform((CloseableIterable)tableScan.planFiles(), ContentScanTask::file);
        return this.getSummaryFromFiles(files, idToTypeMapping, nonPartitionPrimitiveColumns, partitionFields);
    }

    private Partition getEqualityDeleteTableSummary(IcebergTableHandle tableHandle, TupleDomain<IcebergColumnHandle> intersection, Map<Integer, Type.PrimitiveType> idToTypeMapping, List<Types.NestedField> nonPartitionPrimitiveColumns, List<PartitionField> partitionFields) {
        CloseableIterable<DeleteFile> deleteFiles = IcebergUtil.getDeleteFiles(this.icebergTable, tableHandle.getIcebergTableName().getSnapshotId().get(), intersection, tableHandle.getPartitionSpecId(), tableHandle.getEqualityFieldIds());
        CloseableIterable files = CloseableIterable.transform(deleteFiles, deleteFile -> deleteFile);
        return this.getSummaryFromFiles(files, idToTypeMapping, nonPartitionPrimitiveColumns, partitionFields);
    }

    private Partition getSummaryFromFiles(CloseableIterable<ContentFile<?>> files, Map<Integer, Type.PrimitiveType> idToTypeMapping, List<Types.NestedField> nonPartitionPrimitiveColumns, List<PartitionField> partitionFields) {
        Partition summary = null;
        try (CloseableIterable<ContentFile<?>> filesHolder = files;){
            for (ContentFile contentFile : filesHolder) {
                if (summary == null) {
                    summary = new Partition(idToTypeMapping, nonPartitionPrimitiveColumns, contentFile.partition(), contentFile.recordCount(), contentFile.fileSizeInBytes(), Partition.toMap(idToTypeMapping, contentFile.lowerBounds()), Partition.toMap(idToTypeMapping, contentFile.upperBounds()), contentFile.nullValueCounts(), contentFile.columnSizes());
                    continue;
                }
                summary.incrementFileCount();
                summary.incrementRecordCount(contentFile.recordCount());
                summary.incrementSize(contentFile.fileSizeInBytes());
                this.updateSummaryMin(summary, partitionFields, Partition.toMap(idToTypeMapping, contentFile.lowerBounds()), contentFile.nullValueCounts(), contentFile.recordCount());
                this.updateSummaryMax(summary, partitionFields, Partition.toMap(idToTypeMapping, contentFile.upperBounds()), contentFile.nullValueCounts(), contentFile.recordCount());
                summary.updateNullCount(contentFile.nullValueCounts());
                this.updateColumnSizes(summary, contentFile.columnSizes());
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        return summary;
    }

    public static void writeTableStatistics(NodeVersion nodeVersion, IcebergTableHandle tableHandle, Table icebergTable, ConnectorSession session, Collection<ComputedStatistics> computedStatistics) {
        new TableStatisticsMaker(icebergTable, session).writeTableStatistics(nodeVersion, tableHandle, computedStatistics);
    }

    private void writeTableStatistics(NodeVersion nodeVersion, IcebergTableHandle tableHandle, Collection<ComputedStatistics> computedStatistics) {
        Snapshot snapshot = tableHandle.getIcebergTableName().getSnapshotId().map(arg_0 -> ((Table)this.icebergTable).snapshot(arg_0)).orElseGet(() -> ((Table)this.icebergTable).currentSnapshot());
        if (snapshot == null) {
            return;
        }
        try (FileIO io = this.icebergTable.io();){
            String path = ((HasTableOperations)this.icebergTable).operations().metadataFileLocation(String.format("%s-%s.stats", this.session.getQueryId(), UUID.randomUUID()));
            OutputFile outputFile = io.newOutputFile(path);
            try (PuffinWriter writer = Puffin.write((OutputFile)outputFile).createdBy("presto-" + nodeVersion).build();){
                computedStatistics.stream().map(ComputedStatistics::getColumnStatistics).filter(Objects::nonNull).flatMap(map -> map.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)).forEach((key, value) -> {
                    if (!key.getStatisticType().equals((Object)ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES)) {
                        return;
                    }
                    Optional<Integer> id = Optional.ofNullable(this.icebergTable.schema().findField(key.getColumnName())).map(Types.NestedField::fieldId);
                    if (!id.isPresent()) {
                        log.warn("failed to find column name %s in schema of table %s when writing distinct value statistics", new Object[]{key.getColumnName(), this.icebergTable.name()});
                        throw new PrestoException((ErrorCodeSupplier)IcebergErrorCode.ICEBERG_INVALID_METADATA, String.format("failed to find column name %s in schema of table %s when writing distinct value statistics", key.getColumnName(), this.icebergTable.name()));
                    }
                    ByteBuffer raw = VarbinaryType.VARBINARY.getSlice(value, 0).toByteBuffer();
                    CompactSketch sketch = CompactSketch.wrap((Memory)Memory.wrap((ByteBuffer)raw, (ByteOrder)ByteOrder.nativeOrder()));
                    writer.add(new Blob(ICEBERG_THETA_SKETCH_BLOB_TYPE_ID, (List)ImmutableList.of((Object)id.get()), snapshot.snapshotId(), snapshot.sequenceNumber(), raw, null, (Map)ImmutableMap.of((Object)ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY, (Object)Long.toString((long)sketch.getEstimate()))));
                });
                writer.finish();
                this.icebergTable.updateStatistics().setStatistics(snapshot.snapshotId(), (StatisticsFile)new GenericStatisticsFile(snapshot.snapshotId(), path, writer.fileSize(), writer.footerSize(), (List)writer.writtenBlobsMetadata().stream().map(GenericBlobMetadata::from).collect(ImmutableList.toImmutableList()))).commit();
            }
            catch (IOException e) {
                log.warn((Throwable)e, "failed to write table statistics file");
                throw new PrestoException((ErrorCodeSupplier)IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR, "failed to write statistics file", (Throwable)e);
            }
        }
    }

    public void updateColumnSizes(Partition summary, Map<Integer, Long> addedColumnSizes) {
        Map<Integer, Long> columnSizes = summary.getColumnSizes();
        if (!summary.hasValidColumnMetrics() || columnSizes == null || addedColumnSizes == null) {
            return;
        }
        for (Types.NestedField column : summary.getNonPartitionPrimitiveColumns()) {
            int id = column.fieldId();
            Long addedSize = addedColumnSizes.get(id);
            if (addedSize == null) continue;
            columnSizes.put(id, addedSize + columnSizes.getOrDefault(id, 0L));
        }
    }

    private void updateSummaryMin(Partition summary, List<PartitionField> partitionFields, Map<Integer, Object> lowerBounds, Map<Integer, Long> nullCounts, long recordCount) {
        summary.updateStats(summary.getMinValues(), lowerBounds, nullCounts, recordCount, i -> i > 0);
        this.updatePartitionedStats(summary, partitionFields, summary.getMinValues(), lowerBounds, i -> i > 0);
    }

    private void updateSummaryMax(Partition summary, List<PartitionField> partitionFields, Map<Integer, Object> upperBounds, Map<Integer, Long> nullCounts, long recordCount) {
        summary.updateStats(summary.getMaxValues(), upperBounds, nullCounts, recordCount, i -> i < 0);
        this.updatePartitionedStats(summary, partitionFields, summary.getMaxValues(), upperBounds, i -> i < 0);
    }

    private void updatePartitionedStats(Partition summary, List<PartitionField> partitionFields, Map<Integer, Object> current, Map<Integer, Object> newStats, Predicate<Integer> predicate) {
        for (PartitionField field : partitionFields) {
            Comparator comparator;
            Object oldValue;
            Object newValue;
            int id = field.sourceId();
            if (summary.getCorruptedStats().contains(id) || (newValue = newStats.get(id)) == null || (oldValue = current.putIfAbsent(id, newValue)) == null || !predicate.test((comparator = Comparators.forType((Type.PrimitiveType)summary.getIdToTypeMapping().get(id))).compare(oldValue, newValue))) continue;
            current.put(id, newValue);
        }
    }

    private Optional<StatisticsFile> getClosestStatisticsFileForSnapshot(IcebergTableHandle handle) {
        Snapshot target = handle.getIcebergTableName().getSnapshotId().map(arg_0 -> ((Table)this.icebergTable).snapshot(arg_0)).orElseGet(() -> ((Table)this.icebergTable).currentSnapshot());
        return this.icebergTable.statisticsFiles().stream().min((first, second) -> {
            if (first == second) {
                return 0;
            }
            if (this.icebergTable.snapshot(first.snapshotId()) == null) {
                return 1;
            }
            if (this.icebergTable.snapshot(second.snapshotId()) == null) {
                return -1;
            }
            Snapshot firstSnap = this.icebergTable.snapshot(first.snapshotId());
            Snapshot secondSnap = this.icebergTable.snapshot(second.snapshotId());
            long firstDiff = Math.abs(target.timestampMillis() - firstSnap.timestampMillis());
            long secondDiff = Math.abs(target.timestampMillis() - secondSnap.timestampMillis());
            Optional<Long> targetTotalRecords = Optional.ofNullable(target.summary().get("total-records")).map(Long::parseLong);
            Optional<Long> firstTotalRecords = Optional.ofNullable(firstSnap.summary().get("total-records")).map(Long::parseLong);
            Optional<Long> secondTotalRecords = Optional.ofNullable(secondSnap.summary().get("total-records")).map(Long::parseLong);
            if (targetTotalRecords.isPresent() && firstTotalRecords.isPresent() && secondTotalRecords.isPresent()) {
                long targetTotal = targetTotalRecords.get();
                double weight = IcebergSessionProperties.getStatisticSnapshotRecordDifferenceWeight(this.session);
                firstDiff += (long)(weight * (double)Math.abs(firstTotalRecords.get() - targetTotal));
                secondDiff += (long)(weight * (double)Math.abs(secondTotalRecords.get() - targetTotal));
            }
            return Long.compare(firstDiff, secondDiff);
        });
    }

    private static Map<Integer, ColumnStatistics.Builder> loadStatisticsFile(StatisticsFile file) {
        ImmutableMap.Builder result = ImmutableMap.builder();
        file.blobMetadata().forEach(blob -> {
            Integer field = (Integer)Iterables.getOnlyElement((Iterable)blob.fields());
            ColumnStatistics.Builder colStats = ColumnStatistics.builder();
            Optional.ofNullable(blob.properties().get(ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY)).ifPresent(ndvProp -> {
                try {
                    long ndv = Long.parseLong(ndvProp);
                    colStats.setDistinctValuesCount(Estimate.of((double)ndv));
                }
                catch (NumberFormatException e) {
                    colStats.setDistinctValuesCount(Estimate.unknown());
                    log.warn("bad long value when parsing statistics file %s, bad value: %d", new Object[]{file.path(), ndvProp});
                }
            });
            result.put((Object)field, (Object)colStats);
        });
        return result.build();
    }

    public static List<ColumnStatisticMetadata> getSupportedColumnStatistics(String columnName, Type type) {
        ImmutableList.Builder supportedStatistics = ImmutableList.builder();
        if (TypeUtils.isNumericType((Type)type) || type.equals(DateType.DATE) || Varchars.isVarcharType((Type)type) || type.equals(TimestampType.TIMESTAMP) || type.equals(TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE)) {
            supportedStatistics.add((Object)ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES.getColumnStatisticMetadataWithCustomFunction(columnName, "sketch_theta"));
        }
        return supportedStatistics.build();
    }
}

