/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.operation;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.operation.metrics.WriterBufferMetric;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class MemoryFileStoreWrite<T>
extends AbstractFileStoreWrite<T> {
    private static final Logger LOG = LoggerFactory.getLogger(MemoryFileStoreWrite.class);
    protected final CoreOptions options;
    protected final CacheManager cacheManager;
    private MemoryPoolFactory writeBufferPool;
    private WriterBufferMetric writerBufferMetric;

    public MemoryFileStoreWrite(String commitUser, SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options, @Nullable IndexMaintainer.Factory<T> indexFactory, @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory, String tableName) {
        super(commitUser, snapshotManager, scan, indexFactory, dvMaintainerFactory, tableName, options.writeMaxWritersToSpill());
        this.options = options;
        this.cacheManager = new CacheManager(options.lookupCacheMaxMemory());
    }

    @Override
    public FileStoreWrite<T> withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) {
        this.writeBufferPool = memoryPoolFactory.addOwners(this::memoryOwners);
        return this;
    }

    private Iterator<MemoryOwner> memoryOwners() {
        final Iterator iterator = this.writers.values().iterator();
        return Iterators.concat(new Iterator<Iterator<MemoryOwner>>(){

            @Override
            public boolean hasNext() {
                return iterator.hasNext();
            }

            @Override
            public Iterator<MemoryOwner> next() {
                return Iterators.transform(((Map)iterator.next()).values().iterator(), writerContainer -> writerContainer == null ? null : (MemoryOwner)((Object)writerContainer.writer));
            }
        });
    }

    @Override
    protected void notifyNewWriter(RecordWriter<T> writer) {
        if (!(writer instanceof MemoryOwner)) {
            throw new RuntimeException("Should create a MemoryOwner for MemoryTableWrite, but this is: " + writer.getClass());
        }
        if (this.writeBufferPool == null) {
            LOG.debug("Use default heap memory segment pool for write buffer.");
            this.writeBufferPool = new MemoryPoolFactory(new HeapMemorySegmentPool(this.options.writeBufferSize(), this.options.pageSize())).addOwners(this::memoryOwners);
        }
        this.writeBufferPool.notifyNewOwner((MemoryOwner)((Object)writer));
        if (this.writerBufferMetric != null) {
            this.writerBufferMetric.increaseNumWriters();
        }
    }

    @Override
    public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry) {
        super.withMetricRegistry(metricRegistry);
        this.registerWriterBufferMetric(metricRegistry);
        return this;
    }

    private void registerWriterBufferMetric(MetricRegistry metricRegistry) {
        if (metricRegistry != null) {
            this.writerBufferMetric = new WriterBufferMetric(() -> this.writeBufferPool, metricRegistry, this.tableName);
        }
    }

    @Override
    public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier) throws Exception {
        List<CommitMessage> result = super.prepareCommit(waitCompaction, commitIdentifier);
        if (this.writerBufferMetric != null) {
            this.writerBufferMetric.setNumWriters(this.writers.values().stream().mapToInt(Map::size).sum());
        }
        return result;
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this.writerBufferMetric != null) {
            this.writerBufferMetric.close();
        }
    }
}

