/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.index;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.crosspartition.IndexBootstrap;
import org.apache.paimon.crosspartition.KeyPartOrRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.utils.SerializableFunction;

public class IndexBootstrapOperator<T>
extends AbstractStreamOperator<Tuple2<KeyPartOrRow, T>>
implements OneInputStreamOperator<T, Tuple2<KeyPartOrRow, T>> {
    private static final long serialVersionUID = 1L;
    private final IndexBootstrap bootstrap;
    private final SerializableFunction<InternalRow, T> converter;

    private IndexBootstrapOperator(StreamOperatorParameters<Tuple2<KeyPartOrRow, T>> parameters, IndexBootstrap bootstrap, SerializableFunction<InternalRow, T> converter) {
        this.bootstrap = bootstrap;
        this.converter = converter;
        this.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.bootstrap.bootstrap(RuntimeContextUtils.getNumberOfParallelSubtasks((RuntimeContext)this.getRuntimeContext()), RuntimeContextUtils.getIndexOfThisSubtask((RuntimeContext)this.getRuntimeContext()), this::collect);
    }

    public void processElement(StreamRecord<T> streamRecord) throws Exception {
        this.output.collect((Object)new StreamRecord((Object)new Tuple2((Object)KeyPartOrRow.ROW, streamRecord.getValue())));
    }

    private void collect(InternalRow row) {
        this.output.collect((Object)new StreamRecord((Object)new Tuple2((Object)KeyPartOrRow.KEY_PART, this.converter.apply(row))));
    }

    public static class Factory<T>
    extends AbstractStreamOperatorFactory<Tuple2<KeyPartOrRow, T>>
    implements OneInputStreamOperatorFactory<T, Tuple2<KeyPartOrRow, T>> {
        private final IndexBootstrap bootstrap;
        private final SerializableFunction<InternalRow, T> converter;

        public Factory(IndexBootstrap bootstrap, SerializableFunction<InternalRow, T> converter) {
            this.chainingStrategy = ChainingStrategy.ALWAYS;
            this.bootstrap = bootstrap;
            this.converter = converter;
        }

        public <OP extends StreamOperator<Tuple2<KeyPartOrRow, T>>> OP createStreamOperator(StreamOperatorParameters<Tuple2<KeyPartOrRow, T>> parameters) {
            return (OP)((Object)new IndexBootstrapOperator(parameters, this.bootstrap, this.converter));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return IndexBootstrapOperator.class;
        }
    }
}

