/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.shuffle;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.flink.sink.shuffle.DataStatistics;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsEvent;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsOrRecord;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class DataStatisticsOperator<D extends DataStatistics<D, S>, S>
extends AbstractStreamOperator<DataStatisticsOrRecord<D, S>>
implements OneInputStreamOperator<RowData, DataStatisticsOrRecord<D, S>>,
OperatorEventHandler {
    private static final long serialVersionUID = 1L;
    private final KeySelector<RowData, RowData> keySelector;
    private final OperatorEventGateway operatorEventGateway;
    private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
    private volatile transient DataStatistics<D, S> localStatistics;
    private volatile transient DataStatistics<D, S> globalStatistics;
    private transient ListState<DataStatistics<D, S>> globalStatisticsState;

    DataStatisticsOperator(KeySelector<RowData, RowData> keySelector, OperatorEventGateway operatorEventGateway, TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
        this.keySelector = keySelector;
        this.operatorEventGateway = operatorEventGateway;
        this.statisticsSerializer = statisticsSerializer;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        this.localStatistics = (DataStatistics)this.statisticsSerializer.createInstance();
        this.globalStatisticsState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor("globalStatisticsState", this.statisticsSerializer));
        if (context.isRestored()) {
            int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
            if (this.globalStatisticsState.get() == null || !((Iterable)this.globalStatisticsState.get()).iterator().hasNext()) {
                LOG.warn("Subtask {} doesn't have global statistics state to restore", (Object)subtaskIndex);
                this.globalStatistics = (DataStatistics)this.statisticsSerializer.createInstance();
            } else {
                LOG.info("Restoring global statistics state for subtask {}", (Object)subtaskIndex);
                this.globalStatistics = (DataStatistics)((Iterable)this.globalStatisticsState.get()).iterator().next();
            }
        } else {
            this.globalStatistics = (DataStatistics)this.statisticsSerializer.createInstance();
        }
    }

    public void open() throws Exception {
        if (!this.globalStatistics.isEmpty()) {
            this.output.collect((Object)new StreamRecord(DataStatisticsOrRecord.fromDataStatistics(this.globalStatistics)));
        }
    }

    public void handleOperatorEvent(OperatorEvent event) {
        Preconditions.checkArgument((boolean)(event instanceof DataStatisticsEvent), (Object)("Received unexpected operator event " + event.getClass()));
        DataStatisticsEvent statisticsEvent = (DataStatisticsEvent)event;
        this.globalStatistics = statisticsEvent.dataStatistics();
        this.output.collect((Object)new StreamRecord(DataStatisticsOrRecord.fromDataStatistics(this.globalStatistics)));
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        RowData record = (RowData)streamRecord.getValue();
        RowData key = (RowData)this.keySelector.getKey((Object)record);
        this.localStatistics.add(key);
        this.output.collect((Object)new StreamRecord(DataStatisticsOrRecord.fromRecord(record)));
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        long checkpointId = context.getCheckpointId();
        int subTaskId = this.getRuntimeContext().getIndexOfThisSubtask();
        LOG.info("Taking data statistics operator snapshot for checkpoint {} in subtask {}", (Object)checkpointId, (Object)subTaskId);
        if (!this.globalStatistics.isEmpty() && this.getRuntimeContext().getIndexOfThisSubtask() == 0) {
            this.globalStatisticsState.clear();
            LOG.info("Saving global statistics {} to state in subtask {}", this.globalStatistics, (Object)subTaskId);
            this.globalStatisticsState.add(this.globalStatistics);
        }
        this.operatorEventGateway.sendEventToCoordinator(new DataStatisticsEvent<D, S>(checkpointId, this.localStatistics));
        this.localStatistics = (DataStatistics)this.statisticsSerializer.createInstance();
    }

    @VisibleForTesting
    DataStatistics<D, S> localDataStatistics() {
        return this.localStatistics;
    }

    @VisibleForTesting
    DataStatistics<D, S> globalDataStatistics() {
        return this.globalStatistics;
    }
}

