/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.index;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.sink.RowDataPartitionKeyExtractor;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecordPartitionKeyExtractor;
import org.apache.paimon.flink.sink.cdc.CdcRecordUtils;
import org.apache.paimon.flink.sink.index.GlobalIndexAssigner;
import org.apache.paimon.flink.sink.index.KeyPartOrRow;
import org.apache.paimon.flink.sink.index.KeyPartPartitionKeyExtractor;
import org.apache.paimon.flink.utils.ProjectToRowDataFunction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

public class GlobalIndexAssignerOperator<T>
extends AbstractStreamOperator<Tuple2<T, Integer>>
implements OneInputStreamOperator<Tuple2<KeyPartOrRow, T>, Tuple2<T, Integer>> {
    private static final long serialVersionUID = 1L;
    private final GlobalIndexAssigner<T> assigner;

    public GlobalIndexAssignerOperator(GlobalIndexAssigner<T> assigner) {
        this.assigner = assigner;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        File[] tmpDirs = this.getContainingTask().getEnvironment().getIOManager().getSpillingDirectories();
        File tmpDir = tmpDirs[ThreadLocalRandom.current().nextInt(tmpDirs.length)];
        this.assigner.open(tmpDir, this.getRuntimeContext().getNumberOfParallelSubtasks(), this.getRuntimeContext().getIndexOfThisSubtask(), this::collect);
    }

    public void processElement(StreamRecord<Tuple2<KeyPartOrRow, T>> streamRecord) throws Exception {
        Tuple2 tuple2 = (Tuple2)streamRecord.getValue();
        Object value = tuple2.f1;
        switch ((KeyPartOrRow)((Object)tuple2.f0)) {
            case KEY_PART: {
                this.assigner.bootstrap(value);
                break;
            }
            case ROW: {
                this.assigner.process(value);
            }
        }
    }

    private void collect(T value, int bucket) {
        this.output.collect((Object)new StreamRecord((Object)new Tuple2(value, (Object)bucket)));
    }

    public void close() throws IOException {
        this.assigner.close();
    }

    public static GlobalIndexAssignerOperator<RowData> forRowData(Table table) {
        return new GlobalIndexAssignerOperator<RowData>(GlobalIndexAssignerOperator.createRowDataAssigner(table));
    }

    public static GlobalIndexAssigner<RowData> createRowDataAssigner(Table t) {
        return new GlobalIndexAssigner<RowData>(t, RowDataPartitionKeyExtractor::new, KeyPartPartitionKeyExtractor::new, new ProjectToRowDataFunction(t.rowType(), t.partitionKeys()), (rowData, rowKind) -> {
            rowData.setRowKind(FlinkRowData.toFlinkRowKind(rowKind));
            return rowData;
        });
    }

    public static GlobalIndexAssignerOperator<CdcRecord> forCdcRecord(Table table) {
        RowType partitionType = ((FileStoreTable)table).schema().logicalPartitionType();
        List<String> partitionNames = partitionType.getFieldNames();
        RowDataToObjectArrayConverter converter = new RowDataToObjectArrayConverter(partitionType);
        GlobalIndexAssigner assigner = new GlobalIndexAssigner(table, CdcRecordPartitionKeyExtractor::new, CdcRecordPartitionKeyExtractor::new, (record, part) -> {
            CdcRecord partCdc = CdcRecordUtils.fromGenericRow(GenericRow.of(converter.convert((InternalRow)part)), partitionNames);
            HashMap<String, String> fields = new HashMap<String, String>(record.fields());
            fields.putAll(partCdc.fields());
            return new CdcRecord(record.kind(), fields);
        }, CdcRecord::setRowKind);
        return new GlobalIndexAssignerOperator<CdcRecord>(assigner);
    }
}

