/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.coordinator;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.coordinator.PagedCoordinationRequest;
import org.apache.paimon.flink.sink.coordinator.PagedCoordinationResponse;
import org.apache.paimon.flink.sink.coordinator.ScanCoordinationRequest;
import org.apache.paimon.flink.sink.coordinator.ScanCoordinationResponse;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.WriteRestore;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.InstantiationUtil;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializationUtils;

public class TableWriteCoordinator {
    private final FileStoreTable table;
    private final Map<String, Long> latestCommittedIdentifiers;
    private final FileStoreScan scan;
    private final IndexFileHandler indexFileHandler;
    private final int pageSize;
    private final Cache<CoordinationKey, byte[]> pagedCoordination;
    private volatile Snapshot snapshot;

    public TableWriteCoordinator(FileStoreTable table) {
        this.table = table;
        Preconditions.checkNotNull(table.getManifestCache());
        this.latestCommittedIdentifiers = new ConcurrentHashMap<String, Long>();
        this.scan = table.store().newScan();
        if (table.coreOptions().manifestDeleteFileDropStats()) {
            this.scan.dropStats();
        }
        this.indexFileHandler = table.store().newIndexFileHandler();
        this.pageSize = (int)table.coreOptions().toConfiguration().get(FlinkConnectorOptions.SINK_WRITER_COORDINATOR_PAGE_SIZE).getBytes();
        this.pagedCoordination = Caffeine.newBuilder().executor(Runnable::run).expireAfterAccess(Duration.ofMinutes(30L)).build();
        this.refresh();
    }

    private synchronized void refresh() {
        Optional<Snapshot> latestSnapshot = this.table.latestSnapshot();
        if (!latestSnapshot.isPresent()) {
            return;
        }
        this.snapshot = latestSnapshot.get();
        this.scan.withSnapshot(this.snapshot);
    }

    public synchronized PagedCoordinationResponse scan(PagedCoordinationRequest request) throws IOException {
        ScanCoordinationRequest coordination;
        if (this.snapshot == null) {
            return new PagedCoordinationResponse(InstantiationUtil.serializeObject(new ScanCoordinationResponse(null, null, null, null, null)), null);
        }
        Integer pageToken = request.pageToken();
        CoordinationKey requestKey = new CoordinationKey(request.content(), request.requestId());
        if (pageToken != null) {
            byte[] full = this.pagedCoordination.getIfPresent(requestKey);
            if (full == null) {
                throw new RuntimeException("This is a bug for write coordinator, request non existence content.");
            }
            int len = Math.min(full.length - pageToken, this.pageSize);
            byte[] content = Arrays.copyOfRange(full, (int)pageToken, pageToken + len);
            Integer nextPageToken = pageToken + len;
            if (nextPageToken >= full.length) {
                nextPageToken = null;
                this.pagedCoordination.invalidate(requestKey);
            }
            return new PagedCoordinationResponse(content, nextPageToken);
        }
        try {
            coordination = (ScanCoordinationRequest)InstantiationUtil.deserializeObject(request.content(), this.getClass().getClassLoader());
        }
        catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
        ScanCoordinationResponse response = this.scan(coordination);
        byte[] full = InstantiationUtil.serializeObject(response);
        if (full.length <= this.pageSize) {
            return new PagedCoordinationResponse(full, null);
        }
        this.pagedCoordination.put(requestKey, full);
        byte[] content = Arrays.copyOfRange(full, 0, this.pageSize);
        return new PagedCoordinationResponse(content, this.pageSize);
    }

    public synchronized ScanCoordinationResponse scan(ScanCoordinationRequest request) throws IOException {
        if (this.snapshot == null) {
            return new ScanCoordinationResponse(null, null, null, null, null);
        }
        BinaryRow partition = SerializationUtils.deserializeBinaryRow(request.partition());
        int bucket = request.bucket();
        ArrayList<DataFileMeta> restoreFiles = new ArrayList<DataFileMeta>();
        List<ManifestEntry> entries = this.scan.withPartitionBucket(partition, bucket).plan().files();
        Integer totalBuckets = WriteRestore.extractDataFiles(entries, restoreFiles);
        IndexFileMeta dynamicBucketIndex = null;
        if (request.scanDynamicBucketIndex()) {
            dynamicBucketIndex = this.indexFileHandler.scanHashIndex(this.snapshot, partition, bucket).orElse(null);
        }
        List<IndexFileMeta> deleteVectorsIndex = null;
        if (request.scanDeleteVectorsIndex()) {
            deleteVectorsIndex = this.indexFileHandler.scan(this.snapshot, "DELETION_VECTORS", partition, bucket);
        }
        return new ScanCoordinationResponse(this.snapshot, totalBuckets, restoreFiles, dynamicBucketIndex, deleteVectorsIndex);
    }

    public synchronized long latestCommittedIdentifier(String user) {
        return this.latestCommittedIdentifiers.computeIfAbsent(user, this::computeLatestIdentifier);
    }

    private synchronized long computeLatestIdentifier(String user) {
        Optional<Snapshot> snapshotOptional = this.table.snapshotManager().latestSnapshotOfUser(user);
        if (!snapshotOptional.isPresent()) {
            return Long.MIN_VALUE;
        }
        Snapshot latestSnapshotOfUser = snapshotOptional.get();
        if (this.snapshot == null || latestSnapshotOfUser.id() > this.snapshot.id()) {
            this.snapshot = latestSnapshotOfUser;
            this.scan.withSnapshot(this.snapshot);
        }
        return latestSnapshotOfUser.commitIdentifier();
    }

    public void checkpoint() {
        this.refresh();
        this.latestCommittedIdentifiers.clear();
    }

    private static class CoordinationKey {
        private final byte[] content;
        private final String uuid;

        private CoordinationKey(byte[] content, String uuid) {
            this.content = content;
            this.uuid = uuid;
        }

        public boolean equals(Object o) {
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            CoordinationKey that = (CoordinationKey)o;
            return Objects.deepEquals(this.content, that.content) && Objects.equals(this.uuid, that.uuid);
        }

        public int hashCode() {
            return Objects.hash(Arrays.hashCode(this.content), this.uuid);
        }
    }
}

