/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.shuffle;

import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.sink.shuffle.DataStatistics;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsSerializer;
import org.apache.iceberg.flink.sink.shuffle.GlobalStatistics;
import org.apache.iceberg.flink.sink.shuffle.GlobalStatisticsSerializer;
import org.apache.iceberg.flink.sink.shuffle.RequestGlobalStatisticsEvent;
import org.apache.iceberg.flink.sink.shuffle.SketchUtil;
import org.apache.iceberg.flink.sink.shuffle.SortKeySerializer;
import org.apache.iceberg.flink.sink.shuffle.StatisticsEvent;
import org.apache.iceberg.flink.sink.shuffle.StatisticsOrRecord;
import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
import org.apache.iceberg.flink.sink.shuffle.StatisticsUtil;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

@Internal
public class DataStatisticsOperator
extends AbstractStreamOperator<StatisticsOrRecord>
implements OneInputStreamOperator<RowData, StatisticsOrRecord>,
OperatorEventHandler {
    private static final long serialVersionUID = 1L;
    private final String operatorName;
    private final RowDataWrapper rowDataWrapper;
    private final SortKey sortKey;
    private final OperatorEventGateway operatorEventGateway;
    private final int downstreamParallelism;
    private final StatisticsType statisticsType;
    private final TypeSerializer<DataStatistics> taskStatisticsSerializer;
    private final TypeSerializer<GlobalStatistics> globalStatisticsSerializer;
    private transient int parallelism;
    private transient int subtaskIndex;
    private transient ListState<GlobalStatistics> globalStatisticsState;
    private volatile transient StatisticsType taskStatisticsType;
    private volatile transient DataStatistics localStatistics;
    private volatile transient GlobalStatistics globalStatistics;

    DataStatisticsOperator(String operatorName, Schema schema, SortOrder sortOrder, OperatorEventGateway operatorEventGateway, int downstreamParallelism, StatisticsType statisticsType) {
        this.operatorName = operatorName;
        this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), schema.asStruct());
        this.sortKey = new SortKey(schema, sortOrder);
        this.operatorEventGateway = operatorEventGateway;
        this.downstreamParallelism = downstreamParallelism;
        this.statisticsType = statisticsType;
        SortKeySerializer sortKeySerializer = new SortKeySerializer(schema, sortOrder);
        this.taskStatisticsSerializer = new DataStatisticsSerializer(sortKeySerializer);
        this.globalStatisticsSerializer = new GlobalStatisticsSerializer(sortKeySerializer);
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        this.parallelism = this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
        this.subtaskIndex = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
        this.globalStatisticsState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor("globalStatisticsState", this.globalStatisticsSerializer));
        if (context.isRestored()) {
            if (this.globalStatisticsState.get() == null || !((Iterable)this.globalStatisticsState.get()).iterator().hasNext()) {
                LOG.info("Operator {} subtask {} doesn't have global statistics state to restore", (Object)this.operatorName, (Object)this.subtaskIndex);
            } else {
                GlobalStatistics restoredStatistics = (GlobalStatistics)((Iterable)this.globalStatisticsState.get()).iterator().next();
                LOG.info("Operator {} subtask {} restored global statistics state", (Object)this.operatorName, (Object)this.subtaskIndex);
                this.globalStatistics = restoredStatistics;
            }
            LOG.info("Operator {} subtask {} requests new global statistics from coordinator ", (Object)this.operatorName, (Object)this.subtaskIndex);
            RequestGlobalStatisticsEvent event = this.globalStatistics != null ? new RequestGlobalStatisticsEvent(this.globalStatistics.hashCode()) : new RequestGlobalStatisticsEvent();
            this.operatorEventGateway.sendEventToCoordinator((OperatorEvent)event);
        }
        this.taskStatisticsType = StatisticsUtil.collectType(this.statisticsType, this.globalStatistics);
        this.localStatistics = StatisticsUtil.createTaskStatistics(this.taskStatisticsType, this.parallelism, this.downstreamParallelism);
    }

    public void open() throws Exception {
        if (this.globalStatistics != null) {
            this.output.collect((Object)new StreamRecord((Object)StatisticsOrRecord.fromStatistics(this.globalStatistics)));
        }
    }

    public void handleOperatorEvent(OperatorEvent event) {
        Preconditions.checkArgument((boolean)(event instanceof StatisticsEvent), (Object)String.format("Operator %s subtask %s received unexpected operator event %s", this.operatorName, this.subtaskIndex, event.getClass()));
        StatisticsEvent statisticsEvent = (StatisticsEvent)event;
        LOG.info("Operator {} subtask {} received global data event from coordinator checkpoint {}", new Object[]{this.operatorName, this.subtaskIndex, statisticsEvent.checkpointId()});
        this.globalStatistics = StatisticsUtil.deserializeGlobalStatistics(statisticsEvent.statisticsBytes(), this.globalStatisticsSerializer);
        this.checkStatisticsTypeMigration();
        if (statisticsEvent.applyImmediately()) {
            this.output.collect((Object)new StreamRecord((Object)StatisticsOrRecord.fromStatistics(this.globalStatistics)));
        }
    }

    public void processElement(StreamRecord<RowData> streamRecord) {
        RowData record = (RowData)streamRecord.getValue();
        RowDataWrapper struct = this.rowDataWrapper.wrap(record);
        this.sortKey.wrap((StructLike)struct);
        this.localStatistics.add(this.sortKey);
        this.checkStatisticsTypeMigration();
        this.output.collect((Object)new StreamRecord((Object)StatisticsOrRecord.fromRecord(record)));
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        long checkpointId = context.getCheckpointId();
        LOG.info("Operator {} subtask {} snapshotting data statistics for checkpoint {}", new Object[]{this.operatorName, this.subtaskIndex, checkpointId});
        if (this.globalStatistics != null) {
            this.output.collect((Object)new StreamRecord((Object)StatisticsOrRecord.fromStatistics(this.globalStatistics)));
        }
        if (this.globalStatistics != null && this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 0) {
            this.globalStatisticsState.clear();
            LOG.info("Operator {} subtask {} saving global statistics to state", (Object)this.operatorName, (Object)this.subtaskIndex);
            this.globalStatisticsState.add((Object)this.globalStatistics);
            LOG.debug("Operator {} subtask {} saved global statistics to state: {}", new Object[]{this.operatorName, this.subtaskIndex, this.globalStatistics});
        }
        LOG.info("Operator {} Subtask {} sending local statistics to coordinator for checkpoint {}", new Object[]{this.operatorName, this.subtaskIndex, checkpointId});
        this.operatorEventGateway.sendEventToCoordinator((OperatorEvent)StatisticsEvent.createTaskStatisticsEvent(checkpointId, this.localStatistics, this.taskStatisticsSerializer));
        this.localStatistics = StatisticsUtil.createTaskStatistics(this.taskStatisticsType, this.parallelism, this.downstreamParallelism);
    }

    private void checkStatisticsTypeMigration() {
        Map mapStatistics;
        if (this.statisticsType == StatisticsType.Auto && this.localStatistics.type() == StatisticsType.Map && ((mapStatistics = (Map)this.localStatistics.result()).size() > 10000 || this.globalStatistics != null && this.globalStatistics.type() == StatisticsType.Sketch)) {
            LOG.info("Operator {} subtask {} switched local statistics from Map to Sketch.", (Object)this.operatorName, (Object)this.subtaskIndex);
            this.taskStatisticsType = StatisticsType.Sketch;
            this.localStatistics = StatisticsUtil.createTaskStatistics(this.taskStatisticsType, this.parallelism, this.downstreamParallelism);
            SketchUtil.convertMapToSketch(mapStatistics, this.localStatistics::add);
        }
    }

    @VisibleForTesting
    DataStatistics localStatistics() {
        return this.localStatistics;
    }

    @VisibleForTesting
    GlobalStatistics globalStatistics() {
        return this.globalStatistics;
    }
}

