/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.lookup.LookupCompactDiffRead;
import org.apache.paimon.flink.lookup.LookupDataTableScan;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.description.DescribedEnum;
import org.apache.paimon.options.description.InlineElement;
import org.apache.paimon.options.description.TextElement;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DelegatedFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.StreamDataTableScan;

public class LookupFileStoreTable
extends DelegatedFileStoreTable {
    private static final long serialVersionUID = 1L;
    private final LookupStreamScanMode lookupScanMode;

    public LookupFileStoreTable(FileStoreTable wrapped, List<String> joinKeys) {
        super(wrapped);
        this.lookupScanMode = this.lookupStreamScanMode(wrapped, joinKeys);
    }

    public LookupFileStoreTable(FileStoreTable wrapped, LookupStreamScanMode lookupScanMode) {
        super(wrapped);
        this.lookupScanMode = lookupScanMode;
    }

    @Override
    public InnerTableRead newRead() {
        switch (this.lookupScanMode) {
            case CHANGELOG: 
            case FILE_MONITOR: {
                return this.wrapped.newRead();
            }
            case COMPACT_DELTA_MONITOR: {
                return new LookupCompactDiffRead(((KeyValueFileStore)this.wrapped.store()).newRead(), this.wrapped.schema());
            }
        }
        throw new UnsupportedOperationException("Unknown lookup stream scan mode: " + this.lookupScanMode.name());
    }

    @Override
    public StreamDataTableScan newStreamScan() {
        return new LookupDataTableScan(this.wrapped.coreOptions(), this.wrapped.newSnapshotReader(), this.wrapped.snapshotManager(), this.wrapped.supportStreamingReadOverwrite(), DefaultValueAssigner.create(this.wrapped.schema()), this.lookupScanMode);
    }

    @Override
    public FileStoreTable copy(Map<String, String> dynamicOptions) {
        return new LookupFileStoreTable((FileStoreTable)this.wrapped.copy((Map)dynamicOptions), this.lookupScanMode);
    }

    @Override
    public FileStoreTable copy(TableSchema newTableSchema) {
        return new LookupFileStoreTable(this.wrapped.copy(newTableSchema), this.lookupScanMode);
    }

    @Override
    public FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions) {
        return new LookupFileStoreTable((FileStoreTable)this.wrapped.copy((Map)dynamicOptions), this.lookupScanMode);
    }

    @Override
    public FileStoreTable copyWithLatestSchema() {
        return new LookupFileStoreTable(this.wrapped.copyWithLatestSchema(), this.lookupScanMode);
    }

    @Override
    public FileStoreTable switchToBranch(String branchName) {
        this.wrapped.switchToBranch(branchName);
        return this;
    }

    private LookupStreamScanMode lookupStreamScanMode(FileStoreTable table, List<String> joinKeys) {
        Options options = Options.fromMap(table.options());
        if (options.get(FlinkConnectorOptions.LOOKUP_CACHE_MODE) == FlinkConnectorOptions.LookupCacheMode.AUTO && new HashSet<String>(table.primaryKeys()).equals(new HashSet<String>(joinKeys))) {
            return LookupStreamScanMode.FILE_MONITOR;
        }
        if (table.primaryKeys().size() > 0 && options.get(CoreOptions.CHANGELOG_PRODUCER) == CoreOptions.ChangelogProducer.NONE && TableScanUtils.supportCompactDiffStreamingReading(table)) {
            return LookupStreamScanMode.COMPACT_DELTA_MONITOR;
        }
        return LookupStreamScanMode.CHANGELOG;
    }

    public static enum LookupStreamScanMode implements DescribedEnum
    {
        CHANGELOG("changelog", "Streaming reading based on changelog or delta data files."),
        FILE_MONITOR("file-monitor", "Monitor data file changes."),
        COMPACT_DELTA_MONITOR("compact-delta-monitor", "Streaming reading based on data changes before and after compaction.");

        private final String value;
        private final String description;

        private LookupStreamScanMode(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String toString() {
            return this.value;
        }

        @Override
        public InlineElement getDescription() {
            return TextElement.text(this.description);
        }
    }
}

