/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.cdc.CdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecordChannelComputer;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.ChannelComputer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CdcMultiplexRecordChannelComputer
implements ChannelComputer<CdcMultiplexRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(CdcMultiplexRecordChannelComputer.class);
    private static final long serialVersionUID = 1L;
    private final Catalog.Loader catalogLoader;
    private transient int numChannels;
    private Map<Identifier, CdcRecordChannelComputer> channelComputers;
    private Catalog catalog;

    public CdcMultiplexRecordChannelComputer(Catalog.Loader catalogLoader) {
        this.catalogLoader = catalogLoader;
    }

    @Override
    public void setup(int numChannels) {
        this.numChannels = numChannels;
        this.catalog = this.catalogLoader.load();
        this.channelComputers = new HashMap<Identifier, CdcRecordChannelComputer>();
    }

    @Override
    public int channel(CdcMultiplexRecord multiplexRecord) {
        ChannelComputer<CdcRecord> channelComputer = this.computeChannelComputer(multiplexRecord);
        int recordChannel = channelComputer != null ? channelComputer.channel(multiplexRecord.record()) : 0;
        return Math.floorMod(Objects.hash(multiplexRecord.databaseName(), multiplexRecord.tableName()) + recordChannel, this.numChannels);
    }

    private ChannelComputer<CdcRecord> computeChannelComputer(CdcMultiplexRecord record) {
        return this.channelComputers.computeIfAbsent(Identifier.create(record.databaseName(), record.tableName()), id -> {
            FileStoreTable table;
            try {
                table = (FileStoreTable)this.catalog.getTable((Identifier)id);
            }
            catch (Catalog.TableNotExistException e) {
                LOG.error("Failed to get table " + id.getFullName());
                return null;
            }
            if (table.bucketMode() != BucketMode.FIXED) {
                throw new UnsupportedOperationException(String.format("Combine mode Sink only supports FIXED bucket mode, but %s is %s", new Object[]{table.name(), table.bucketMode()}));
            }
            CdcRecordChannelComputer channelComputer = new CdcRecordChannelComputer(table.schema());
            channelComputer.setup(this.numChannels);
            return channelComputer;
        });
    }

    public String toString() {
        return "shuffle by bucket";
    }
}

