/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StateUtils;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.flink.sink.cdc.CdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator;
import org.apache.paimon.flink.sink.cdc.CdcRecordUtils;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ExecutorThreadFactory;

public class CdcRecordStoreMultiWriteOperator
extends PrepareCommitOperator<CdcMultiplexRecord, MultiTableCommittable> {
    private static final long serialVersionUID = 1L;
    private final StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider;
    private final String initialCommitUser;
    private final Catalog.Loader catalogLoader;
    private MemoryPoolFactory memoryPoolFactory;
    private Catalog catalog;
    private Map<Identifier, FileStoreTable> tables;
    private StoreSinkWriteState state;
    private Map<Identifier, StoreSinkWrite> writes;
    private String commitUser;
    private ExecutorService compactExecutor;

    public CdcRecordStoreMultiWriteOperator(Catalog.Loader catalogLoader, StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider, String initialCommitUser, Options options) {
        super(options);
        this.catalogLoader = catalogLoader;
        this.storeSinkWriteProvider = storeSinkWriteProvider;
        this.initialCommitUser = initialCommitUser;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.catalog = this.catalogLoader.load();
        this.commitUser = StateUtils.getSingleValueFromState(context, "commit_user_state", String.class, this.initialCommitUser);
        this.state = new StoreSinkWriteState(context, (tableName, partition, bucket) -> true);
        this.tables = new HashMap<Identifier, FileStoreTable>();
        this.writes = new HashMap<Identifier, StoreSinkWrite>();
        this.compactExecutor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory(Thread.currentThread().getName() + "-CdcMultiWrite-Compaction"));
    }

    public void processElement(StreamRecord<CdcMultiplexRecord> element) throws Exception {
        CdcMultiplexRecord record = (CdcMultiplexRecord)element.getValue();
        String databaseName = record.databaseName();
        String tableName = record.tableName();
        Identifier tableId = Identifier.create(databaseName, tableName);
        FileStoreTable table = this.getTable(tableId);
        if (this.memoryPoolFactory == null) {
            this.memoryPoolFactory = new MemoryPoolFactory(this.memoryPool != null ? this.memoryPool : new HeapMemorySegmentPool(table.coreOptions().writeBufferSize(), table.coreOptions().pageSize()));
        }
        StoreSinkWrite write = this.writes.computeIfAbsent(tableId, id -> this.storeSinkWriteProvider.provide(table, this.commitUser, this.state, this.getContainingTask().getEnvironment().getIOManager(), this.memoryPoolFactory, (MetricGroup)this.getMetricGroup()));
        ((StoreSinkWriteImpl)write).withCompactExecutor(this.compactExecutor);
        Optional<GenericRow> optionalConverted = CdcRecordUtils.toGenericRow(record.record(), table.schema().fields());
        if (!optionalConverted.isPresent()) {
            FileStoreTable latestTable = table;
            while (true) {
                latestTable = latestTable.copyWithLatestSchema();
                this.tables.put(tableId, latestTable);
                optionalConverted = CdcRecordUtils.toGenericRow(record.record(), latestTable.schema().fields());
                if (optionalConverted.isPresent()) break;
                Thread.sleep(latestTable.coreOptions().toConfiguration().get(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME).toMillis());
            }
            write.replace(latestTable);
        }
        try {
            write.write(optionalConverted.get());
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    private FileStoreTable getTable(Identifier tableId) throws InterruptedException {
        FileStoreTable table = this.tables.get(tableId);
        if (table == null) {
            while (true) {
                try {
                    table = (FileStoreTable)this.catalog.getTable(tableId);
                    this.tables.put(tableId, table);
                }
                catch (Catalog.TableNotExistException tableNotExistException) {
                    Thread.sleep(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME.defaultValue().toMillis());
                    continue;
                }
                break;
            }
        }
        if (table.bucketMode() != BucketMode.HASH_FIXED) {
            throw new UnsupportedOperationException(String.format("Combine mode Sink only supports FIXED bucket mode, but %s is %s", new Object[]{table.name(), table.bucketMode()}));
        }
        return table;
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        for (StoreSinkWrite write : this.writes.values()) {
            write.snapshotState();
        }
        this.state.snapshotState();
    }

    @Override
    public void close() throws Exception {
        super.close();
        for (StoreSinkWrite write : this.writes.values()) {
            write.close();
        }
        if (this.compactExecutor != null) {
            this.compactExecutor.shutdownNow();
        }
        if (this.catalog != null) {
            this.catalog.close();
            this.catalog = null;
        }
    }

    @Override
    protected List<MultiTableCommittable> prepareCommit(boolean waitCompaction, long checkpointId) throws IOException {
        LinkedList<MultiTableCommittable> committables = new LinkedList<MultiTableCommittable>();
        for (Map.Entry<Identifier, StoreSinkWrite> entry : this.writes.entrySet()) {
            Identifier key = entry.getKey();
            StoreSinkWrite write = entry.getValue();
            committables.addAll(write.prepareCommit(waitCompaction, checkpointId).stream().map(committable -> MultiTableCommittable.fromCommittable(key, committable)).collect(Collectors.toList()));
        }
        return committables;
    }

    @VisibleForTesting
    public Map<Identifier, FileStoreTable> tables() {
        return this.tables;
    }

    @VisibleForTesting
    public Map<Identifier, StoreSinkWrite> writes() {
        return this.writes;
    }

    @VisibleForTesting
    public String commitUser() {
        return this.commitUser;
    }
}

