/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.crosspartition;

import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.SplitsParallelReadUtil;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.AbstractInnerTableScan;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.TypeUtils;

public class IndexBootstrap
implements Serializable {
    private static final long serialVersionUID = 1L;
    public static final String BUCKET_FIELD = "_BUCKET";
    private final Table table;

    public IndexBootstrap(Table table) {
        this.table = table;
    }

    public void bootstrap(int numAssigners, int assignId, Consumer<InternalRow> collector) throws IOException {
        this.bootstrap(numAssigners, assignId).forEachRemaining(collector);
    }

    public RecordReader<InternalRow> bootstrap(int numAssigners, int assignId) throws IOException {
        RowType rowType = this.table.rowType();
        List<String> fieldNames = rowType.getFieldNames();
        int[] keyProjection = this.table.primaryKeys().stream().map(fieldNames::indexOf).mapToInt(Integer::intValue).toArray();
        ReadBuilder readBuilder = this.table.copy(Collections.singletonMap(CoreOptions.SCAN_MODE.key(), CoreOptions.StartupMode.LATEST.toString())).newReadBuilder().withProjection(keyProjection);
        AbstractInnerTableScan tableScan = (AbstractInnerTableScan)readBuilder.newScan();
        List<Split> splits = tableScan.withBucketFilter(bucket -> bucket % numAssigners == assignId).plan().splits();
        CoreOptions options = CoreOptions.fromMap(this.table.options());
        Duration indexTtl = options.crossPartitionUpsertIndexTtl();
        if (indexTtl != null) {
            long indexTtlMillis = indexTtl.toMillis();
            long currentTime = System.currentTimeMillis();
            splits = splits.stream().filter(split -> IndexBootstrap.filterSplit(split, indexTtlMillis, currentTime)).collect(Collectors.toList());
        }
        RowDataToObjectArrayConverter partBucketConverter = new RowDataToObjectArrayConverter(TypeUtils.concat(TypeUtils.project(rowType, this.table.partitionKeys()), RowType.of(DataTypes.INT())));
        return SplitsParallelReadUtil.parallelExecute(TypeUtils.project(rowType, keyProjection), s -> readBuilder.newRead().createReader((Split)s), splits, options.pageSize(), options.crossPartitionUpsertBootstrapParallelism(), split -> {
            DataSplit dataSplit = (DataSplit)split;
            int bucket = dataSplit.bucket();
            return partBucketConverter.toGenericRow(new JoinedRow(dataSplit.partition(), GenericRow.of(bucket)));
        }, (row, extra) -> new JoinedRow().replace((InternalRow)row, (InternalRow)extra));
    }

    @VisibleForTesting
    static boolean filterSplit(Split split, long indexTtl, long currentTime) {
        List<DataFileMeta> files = ((DataSplit)split).dataFiles();
        for (DataFileMeta file : files) {
            long fileTime = file.creationTimeEpochMillis();
            if (currentTime > fileTime + indexTtl) continue;
            return true;
        }
        return false;
    }

    public static RowType bootstrapType(TableSchema schema) {
        List<String> primaryKeys = schema.primaryKeys();
        List<String> partitionKeys = schema.partitionKeys();
        ArrayList<DataField> bootstrapFields = new ArrayList<DataField>(schema.projectedLogicalRowType(Stream.concat(primaryKeys.stream(), partitionKeys.stream()).collect(Collectors.toList())).getFields());
        bootstrapFields.add(new DataField(RowType.currentHighestFieldId(bootstrapFields) + 1, BUCKET_FIELD, DataTypes.INT().notNull()));
        return new RowType(bootstrapFields);
    }
}

