/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.crosspartition;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.crosspartition.BucketAssigner;
import org.apache.paimon.crosspartition.ExistingProcessor;
import org.apache.paimon.crosspartition.IndexBootstrap;
import org.apache.paimon.crosspartition.KeyPartPartitionKeyExtractor;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.RowBuffer;
import org.apache.paimon.lookup.RocksDBOptions;
import org.apache.paimon.lookup.RocksDBStateFactory;
import org.apache.paimon.lookup.RocksDBValueState;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.PartitionKeyExtractor;
import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.IDMapping;
import org.apache.paimon.utils.KeyValueIterator;
import org.apache.paimon.utils.MutableObjectIterator;
import org.apache.paimon.utils.OffsetRow;
import org.apache.paimon.utils.PositiveIntInt;
import org.apache.paimon.utils.PositiveIntIntSerializer;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.ProjectToRowFunction;
import org.apache.paimon.utils.RowIterator;
import org.apache.paimon.utils.TypeUtils;

public class GlobalIndexAssigner
implements Serializable,
Closeable {
    private static final long serialVersionUID = 1L;
    private static final String INDEX_NAME = "keyIndex";
    private final AbstractFileStoreTable table;
    private transient IOManager ioManager;
    private transient int bucketIndex;
    private transient ProjectToRowFunction setPartition;
    private transient boolean bootstrap;
    private transient BinaryExternalSortBuffer bootstrapKeys;
    private transient RowBuffer bootstrapRecords;
    private transient int targetBucketRowNumber;
    private transient int assignId;
    private transient BiConsumer<InternalRow, Integer> collector;
    private transient int numAssigners;
    private transient PartitionKeyExtractor<InternalRow> extractor;
    private transient PartitionKeyExtractor<InternalRow> keyPartExtractor;
    private transient File path;
    private transient RocksDBStateFactory stateFactory;
    private transient RocksDBValueState<InternalRow, PositiveIntInt> keyIndex;
    private transient IDMapping<BinaryRow> partMapping;
    private transient BucketAssigner bucketAssigner;
    private transient ExistingProcessor existingProcessor;

    public GlobalIndexAssigner(Table table) {
        this.table = (AbstractFileStoreTable)table;
    }

    public void open(long offHeapMemory, IOManager ioManager, int numAssigners, int assignId, BiConsumer<InternalRow, Integer> collector) throws Exception {
        this.ioManager = ioManager;
        this.numAssigners = numAssigners;
        this.assignId = assignId;
        this.collector = collector;
        RowType bootstrapType = IndexBootstrap.bootstrapType(this.table.schema());
        this.bucketIndex = bootstrapType.getFieldCount() - 1;
        this.setPartition = new ProjectToRowFunction(this.table.rowType(), this.table.partitionKeys());
        CoreOptions coreOptions = this.table.coreOptions();
        this.targetBucketRowNumber = (int)coreOptions.dynamicBucketTargetRowNum();
        this.extractor = new RowPartitionKeyExtractor(this.table.schema());
        this.keyPartExtractor = new KeyPartPartitionKeyExtractor(this.table.schema());
        Options options = coreOptions.toConfiguration();
        String rocksDBDir = ioManager.tempDirs()[ThreadLocalRandom.current().nextInt(ioManager.tempDirs().length)];
        this.path = new File(rocksDBDir, "rocksdb-" + UUID.randomUUID());
        Options rocksdbOptions = Options.fromMap(new HashMap<String, String>(options.toMap()));
        long blockCache = Math.max(offHeapMemory, rocksdbOptions.get(RocksDBOptions.BLOCK_CACHE_SIZE).getBytes());
        rocksdbOptions.set(RocksDBOptions.BLOCK_CACHE_SIZE, new MemorySize(blockCache));
        this.stateFactory = new RocksDBStateFactory(this.path.toString(), rocksdbOptions, coreOptions.crossPartitionUpsertIndexTtl());
        RowType keyType = this.table.schema().logicalTrimmedPrimaryKeysType();
        this.keyIndex = this.stateFactory.valueState(INDEX_NAME, new RowCompactedSerializer(keyType), new PositiveIntIntSerializer(), options.get(RocksDBOptions.LOOKUP_CACHE_ROWS));
        this.partMapping = new IDMapping<BinaryRow>(BinaryRow::copy);
        this.bucketAssigner = new BucketAssigner();
        this.existingProcessor = ExistingProcessor.create(coreOptions.mergeEngine(), this.setPartition, this.bucketAssigner, this::collect);
        this.bootstrap = true;
        this.bootstrapKeys = BinaryExternalSortBuffer.create(ioManager, RowType.of(DataTypes.BYTES()), RowType.of(DataTypes.BYTES(), DataTypes.BYTES()), coreOptions.writeBufferSize() / 2L, coreOptions.pageSize(), coreOptions.localSortMaxNumFileHandles());
        this.bootstrapRecords = RowBuffer.getBuffer(ioManager, new HeapMemorySegmentPool(coreOptions.writeBufferSize() / 2L, coreOptions.pageSize()), new InternalRowSerializer(this.table.rowType()), true);
    }

    public void bootstrapKey(InternalRow value) throws IOException {
        Preconditions.checkArgument(this.inBoostrap());
        BinaryRow partition = this.keyPartExtractor.partition(value);
        BinaryRow key = this.keyPartExtractor.trimmedPrimaryKey(value);
        int partId = this.partMapping.index(partition);
        int bucket = value.getInt(this.bucketIndex);
        this.bucketAssigner.bootstrapBucket(partition, bucket);
        PositiveIntInt partAndBucket = new PositiveIntInt(partId, bucket);
        this.bootstrapKeys.write(GenericRow.of(this.keyIndex.serializeKey(key), this.keyIndex.serializeValue(partAndBucket)));
    }

    public boolean inBoostrap() {
        return this.bootstrap;
    }

    public void endBoostrap(boolean isEndInput) throws Exception {
        this.bootstrap = false;
        this.bootstrapRecords.complete();
        boolean isEmpty = true;
        if (this.bootstrapKeys.size() > 0) {
            final MutableObjectIterator<BinaryRow> keyIterator = this.bootstrapKeys.sortedIterator();
            final BinaryRow row = new BinaryRow(2);
            KeyValueIterator<byte[], byte[]> kvIter = new KeyValueIterator<byte[], byte[]>(){
                private BinaryRow current;

                @Override
                public boolean advanceNext() {
                    try {
                        this.current = keyIterator.next(row);
                    }
                    catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                    return this.current != null;
                }

                @Override
                public byte[] getKey() {
                    return this.current.getBinary(0);
                }

                @Override
                public byte[] getValue() {
                    return this.current.getBinary(1);
                }
            };
            this.stateFactory.bulkLoad(this.keyIndex, kvIter);
            isEmpty = false;
        }
        this.bootstrapKeys.clear();
        this.bootstrapKeys = null;
        if (isEmpty && isEndInput) {
            this.bulkLoadBootstrapRecords();
        } else {
            this.loopBootstrapRecords();
        }
    }

    public void processInput(InternalRow value) throws Exception {
        if (this.inBoostrap()) {
            this.bootstrapRecords.put(value);
            return;
        }
        BinaryRow partition = this.extractor.partition(value);
        BinaryRow key = this.extractor.trimmedPrimaryKey(value);
        int partId = this.partMapping.index(partition);
        PositiveIntInt partitionBucket = this.keyIndex.get(key);
        if (partitionBucket != null) {
            int previousPartId = partitionBucket.i1();
            int previousBucket = partitionBucket.i2();
            if (previousPartId == partId) {
                this.collect(value, previousBucket);
            } else {
                BinaryRow previousPart = this.partMapping.get(previousPartId);
                boolean processNewRecord = this.existingProcessor.processExists(value, previousPart, previousBucket);
                if (processNewRecord) {
                    this.processNewRecord(partition, partId, key, value);
                }
            }
        } else {
            this.processNewRecord(partition, partId, key, value);
        }
    }

    @Override
    public void close() throws IOException {
        if (this.stateFactory != null) {
            this.stateFactory.close();
            this.stateFactory = null;
        }
        if (this.path != null) {
            FileIOUtils.deleteDirectoryQuietly(this.path);
        }
    }

    private void bulkLoadBootstrapRecords() {
        RowType rowType = this.table.rowType();
        ArrayList<DataType> fields = new ArrayList<DataType>(TypeUtils.project(rowType, this.table.primaryKeys()).getFieldTypes());
        fields.add(DataTypes.INT());
        RowType keyWithIdType = DataTypes.ROW(fields.toArray(new DataType[0]));
        fields.addAll(rowType.getFieldTypes());
        RowType keyWithRowType = DataTypes.ROW(fields.toArray(new DataType[0]));
        CoreOptions coreOptions = this.table.coreOptions();
        BinaryExternalSortBuffer keyIdBuffer = BinaryExternalSortBuffer.create(this.ioManager, keyWithIdType, keyWithRowType, coreOptions.writeBufferSize() / 2L, coreOptions.pageSize(), coreOptions.localSortMaxNumFileHandles());
        Function<ExistingProcessor.SortOrder, RowIterator> iteratorFunction = sortOrder -> {
            InternalRow row;
            int id = sortOrder == ExistingProcessor.SortOrder.ASCENDING ? 0 : Integer.MAX_VALUE;
            GenericRow idRow = new GenericRow(1);
            JoinedRow keyAndId = new JoinedRow();
            JoinedRow keyAndRow = new JoinedRow();
            try (Object iterator = this.bootstrapRecords.newIterator();){
                while (iterator.advanceNext()) {
                    row = iterator.getRow();
                    BinaryRow key = this.extractor.trimmedPrimaryKey(row);
                    idRow.setField(0, id);
                    keyAndId.replace(key, idRow);
                    keyAndRow.replace(keyAndId, row);
                    try {
                        keyIdBuffer.write(keyAndRow);
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    if (sortOrder == ExistingProcessor.SortOrder.ASCENDING) {
                        ++id;
                        continue;
                    }
                    --id;
                }
            }
            this.bootstrapRecords.reset();
            this.bootstrapRecords = null;
            try {
                iterator = keyIdBuffer.sortedIterator();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            BinaryRow reuseBinaryRow = new BinaryRow(keyWithRowType.getFieldCount());
            row = new OffsetRow(rowType.getFieldCount(), keyWithIdType.getFieldCount());
            return new RowIterator((MutableObjectIterator)iterator, reuseBinaryRow, (OffsetRow)row){
                final /* synthetic */ MutableObjectIterator val$iterator;
                final /* synthetic */ BinaryRow val$reuseBinaryRow;
                final /* synthetic */ OffsetRow val$row;
                {
                    this.val$iterator = mutableObjectIterator;
                    this.val$reuseBinaryRow = binaryRow;
                    this.val$row = offsetRow;
                }

                @Override
                @Nullable
                public InternalRow next() {
                    BinaryRow keyWithRow;
                    try {
                        keyWithRow = this.val$iterator.next(this.val$reuseBinaryRow);
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    if (keyWithRow == null) {
                        return null;
                    }
                    this.val$row.replace(keyWithRow);
                    return this.val$row;
                }
            };
        };
        this.existingProcessor.bulkLoadNewRecords(iteratorFunction, this.extractor::trimmedPrimaryKey, this.extractor::partition, this::assignBucket);
        keyIdBuffer.clear();
    }

    private void loopBootstrapRecords() throws Exception {
        try (RowBuffer.RowBufferIterator iterator = this.bootstrapRecords.newIterator();){
            while (iterator.advanceNext()) {
                this.processInput(iterator.getRow());
            }
        }
        this.bootstrapRecords.reset();
        this.bootstrapRecords = null;
    }

    private void processNewRecord(BinaryRow partition, int partId, BinaryRow key, InternalRow value) throws IOException {
        int bucket = this.assignBucket(partition);
        this.keyIndex.put(key, new PositiveIntInt(partId, bucket));
        this.collect(value, bucket);
    }

    private int assignBucket(BinaryRow partition) {
        return this.bucketAssigner.assignBucket(partition, this::isAssignBucket, this.targetBucketRowNumber);
    }

    private boolean isAssignBucket(int bucket) {
        return this.computeAssignId(bucket) == this.assignId;
    }

    private int computeAssignId(int hash) {
        return Math.abs(hash % this.numAssigners);
    }

    private void collect(InternalRow value, int bucket) {
        this.collector.accept(value, bucket);
    }
}

