/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.lookup.LookupFileStoreTable;
import org.apache.paimon.flink.lookup.LookupStreamingReader;
import org.apache.paimon.flink.lookup.LookupTable;
import org.apache.paimon.flink.lookup.NoPrimaryKeyLookupTable;
import org.apache.paimon.flink.lookup.PrimaryKeyLookupTable;
import org.apache.paimon.flink.lookup.SecondaryIndexLookupTable;
import org.apache.paimon.lookup.BulkLoader;
import org.apache.paimon.lookup.RocksDBState;
import org.apache.paimon.lookup.RocksDBStateFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.ExecutorUtils;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.MutableObjectIterator;
import org.apache.paimon.utils.PartialRow;
import org.apache.paimon.utils.TypeUtils;
import org.apache.paimon.utils.UserDefinedSeqComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FullCacheLookupTable
implements LookupTable {
    private static final Logger LOG = LoggerFactory.getLogger(FullCacheLookupTable.class);
    protected final Object lock = new Object();
    protected final Context context;
    protected final RowType projectedType;
    protected final boolean refreshAsync;
    @Nullable
    protected final FieldsComparator userDefinedSeqComparator;
    protected final int appendUdsFieldNumber;
    protected RocksDBStateFactory stateFactory;
    @Nullable
    private ExecutorService refreshExecutor;
    private final AtomicReference<Exception> cachedException;
    private final int maxPendingSnapshotCount;
    private final FileStoreTable table;
    private Future<?> refreshFuture;
    private LookupStreamingReader reader;
    private Predicate specificPartition;
    @Nullable
    private Filter<InternalRow> cacheRowFilter;

    public FullCacheLookupTable(Context context) {
        this.table = context.table;
        List<Object> sequenceFields = new ArrayList();
        CoreOptions coreOptions = new CoreOptions(this.table.options());
        if (this.table.primaryKeys().size() > 0) {
            sequenceFields = coreOptions.sequenceField();
        }
        RowType projectedType = TypeUtils.project(this.table.rowType(), context.projection);
        if (sequenceFields.size() > 0) {
            RowType.Builder builder = RowType.builder();
            projectedType.getFields().forEach(f -> builder.field(f.name(), f.type()));
            RowType rowType = this.table.rowType();
            AtomicInteger appendUdsFieldNumber = new AtomicInteger(0);
            sequenceFields.stream().filter(projectedType::notContainsField).map(rowType::getField).forEach(f -> {
                appendUdsFieldNumber.incrementAndGet();
                builder.field(f.name(), f.type());
            });
            projectedType = builder.build();
            context = context.copy(this.table.rowType().getFieldIndices(projectedType.getFieldNames()));
            this.userDefinedSeqComparator = UserDefinedSeqComparator.create(projectedType, sequenceFields, coreOptions.sequenceFieldSortOrderIsAscending());
            this.appendUdsFieldNumber = appendUdsFieldNumber.get();
        } else {
            this.userDefinedSeqComparator = null;
            this.appendUdsFieldNumber = 0;
        }
        this.context = context;
        Options options = Options.fromMap(context.table.options());
        this.projectedType = projectedType;
        this.refreshAsync = options.get(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC);
        this.cachedException = new AtomicReference();
        this.maxPendingSnapshotCount = options.get(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT);
    }

    @Override
    public void specificPartitionFilter(Predicate filter) {
        this.specificPartition = filter;
    }

    @Override
    public void specifyCacheRowFilter(Filter<InternalRow> filter) {
        this.cacheRowFilter = filter;
    }

    protected void init() throws Exception {
        this.stateFactory = new RocksDBStateFactory(this.context.tempPath.toString(), this.context.table.coreOptions().toConfiguration(), null);
        this.refreshExecutor = this.refreshAsync ? Executors.newSingleThreadExecutor(new ExecutorThreadFactory(String.format("%s-lookup-refresh", Thread.currentThread().getName()))) : null;
    }

    protected void bootstrap() throws Exception {
        Predicate scanPredicate = PredicateBuilder.andNullable(this.context.tablePredicate, this.specificPartition);
        this.reader = new LookupStreamingReader(this.context.table, this.context.projection, scanPredicate, this.context.requiredCachedBucketIds, this.cacheRowFilter);
        BinaryExternalSortBuffer bulkLoadSorter = RocksDBState.createBulkLoadSorter(IOManager.create(this.context.tempPath.toString()), this.context.table.coreOptions());
        Predicate predicate = this.projectedPredicate();
        try (RecordReaderIterator<InternalRow> batch = new RecordReaderIterator<InternalRow>(this.reader.nextBatch(true));){
            while (batch.hasNext()) {
                InternalRow row = batch.next();
                if (predicate != null && !predicate.test(row)) continue;
                bulkLoadSorter.write(GenericRow.of(this.toKeyBytes(row), this.toValueBytes(row)));
            }
        }
        MutableObjectIterator<BinaryRow> keyIterator = bulkLoadSorter.sortedIterator();
        BinaryRow row = new BinaryRow(2);
        TableBulkLoader bulkLoader = this.createBulkLoader();
        try {
            while ((row = keyIterator.next(row)) != null) {
                bulkLoader.write(row.getBinary(0), row.getBinary(1));
            }
        }
        catch (BulkLoader.WriteException e) {
            throw new RuntimeException("Exception in bulkLoad, the most suspicious reason is that your data contains duplicates, please check your lookup table. ", e.getCause());
        }
        bulkLoader.finish();
        bulkLoadSorter.clear();
    }

    @Override
    public void refresh() throws Exception {
        if (this.refreshExecutor == null) {
            this.doRefresh();
            return;
        }
        Long latestSnapshotId = this.table.snapshotManager().latestSnapshotId();
        Long nextSnapshotId = this.reader.nextSnapshotId();
        if (latestSnapshotId != null && nextSnapshotId != null && latestSnapshotId - nextSnapshotId > (long)this.maxPendingSnapshotCount) {
            LOG.warn("The latest snapshot id {} is much greater than the next snapshot id {} for {}}, you may need to increase the parallelism of lookup operator.", new Object[]{latestSnapshotId, nextSnapshotId, this.maxPendingSnapshotCount});
            if (this.refreshFuture != null) {
                this.refreshFuture.get();
            }
            this.doRefresh();
        } else {
            Future<?> currentFuture = null;
            try {
                currentFuture = this.refreshExecutor.submit(() -> {
                    try {
                        this.doRefresh();
                    }
                    catch (Exception e) {
                        LOG.error("Refresh lookup table {} failed", (Object)this.context.table.name(), (Object)e);
                        this.cachedException.set(e);
                    }
                });
            }
            catch (RejectedExecutionException e) {
                LOG.warn("Add refresh task for lookup table {} failed", (Object)this.context.table.name(), (Object)e);
            }
            if (currentFuture != null) {
                this.refreshFuture = currentFuture;
            }
        }
    }

    private void doRefresh() throws Exception {
        while (true) {
            RecordReaderIterator<InternalRow> batch = new RecordReaderIterator<InternalRow>(this.reader.nextBatch(false));
            Throwable throwable = null;
            try {
                if (!batch.hasNext()) {
                    return;
                }
                this.refresh(batch);
                continue;
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (batch == null) continue;
                if (throwable != null) {
                    try {
                        batch.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                batch.close();
                continue;
            }
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public final List<InternalRow> get(InternalRow key) throws IOException {
        List<InternalRow> values2;
        if (this.refreshAsync) {
            Object object = this.lock;
            synchronized (object) {
                values2 = this.innerGet(key);
            }
        } else {
            values2 = this.innerGet(key);
        }
        if (this.appendUdsFieldNumber == 0) {
            return values2;
        }
        ArrayList<InternalRow> dropSequence = new ArrayList<InternalRow>(values2.size());
        for (InternalRow matchedRow : values2) {
            dropSequence.add(new PartialRow(matchedRow.getFieldCount() - this.appendUdsFieldNumber, matchedRow));
        }
        return dropSequence;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void refresh(Iterator<InternalRow> input) throws IOException {
        Predicate predicate = this.projectedPredicate();
        while (input.hasNext()) {
            InternalRow row = input.next();
            if (this.refreshAsync) {
                Object object = this.lock;
                synchronized (object) {
                    this.refreshRow(row, predicate);
                    continue;
                }
            }
            this.refreshRow(row, predicate);
        }
    }

    public abstract List<InternalRow> innerGet(InternalRow var1) throws IOException;

    protected abstract void refreshRow(InternalRow var1, Predicate var2) throws IOException;

    @Nullable
    public Predicate projectedPredicate() {
        return this.context.projectedPredicate;
    }

    public abstract byte[] toKeyBytes(InternalRow var1) throws IOException;

    public abstract byte[] toValueBytes(InternalRow var1) throws IOException;

    public abstract TableBulkLoader createBulkLoader();

    @Override
    public void close() throws IOException {
        try {
            if (this.refreshExecutor != null) {
                ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, this.refreshExecutor);
            }
        }
        finally {
            this.stateFactory.close();
            FileIOUtils.deleteDirectory(this.context.tempPath);
        }
    }

    @VisibleForTesting
    public Future<?> getRefreshFuture() {
        return this.refreshFuture;
    }

    static FullCacheLookupTable create(Context context, long lruCacheSize) {
        List<String> primaryKeys = context.table.primaryKeys();
        if (primaryKeys.isEmpty()) {
            return new NoPrimaryKeyLookupTable(context, lruCacheSize);
        }
        if (new HashSet<String>(primaryKeys).equals(new HashSet<String>(context.joinKey))) {
            return new PrimaryKeyLookupTable(context, lruCacheSize, context.joinKey);
        }
        return new SecondaryIndexLookupTable(context, lruCacheSize);
    }

    public static class Context {
        public final LookupFileStoreTable table;
        public final int[] projection;
        @Nullable
        public final Predicate tablePredicate;
        @Nullable
        public final Predicate projectedPredicate;
        public final File tempPath;
        public final List<String> joinKey;
        public final Set<Integer> requiredCachedBucketIds;

        public Context(FileStoreTable table, int[] projection, @Nullable Predicate tablePredicate, @Nullable Predicate projectedPredicate, File tempPath, List<String> joinKey, @Nullable Set<Integer> requiredCachedBucketIds) {
            this.table = new LookupFileStoreTable(table, joinKey);
            this.projection = projection;
            this.tablePredicate = tablePredicate;
            this.projectedPredicate = projectedPredicate;
            this.tempPath = tempPath;
            this.joinKey = joinKey;
            this.requiredCachedBucketIds = requiredCachedBucketIds;
        }

        public Context copy(int[] newProjection) {
            return new Context(this.table.wrapped(), newProjection, this.tablePredicate, this.projectedPredicate, this.tempPath, this.joinKey, this.requiredCachedBucketIds);
        }
    }

    public static interface TableBulkLoader {
        public void write(byte[] var1, byte[] var2) throws BulkLoader.WriteException, IOException;

        public void finish() throws IOException;
    }
}

