/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.shuffle;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.sink.shuffle.DataStatistics;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsEvent;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsOrRecord;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsUtil;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

@Internal
class DataStatisticsOperator<D extends DataStatistics<D, S>, S>
extends AbstractStreamOperator<DataStatisticsOrRecord<D, S>>
implements OneInputStreamOperator<RowData, DataStatisticsOrRecord<D, S>>,
OperatorEventHandler {
    private static final long serialVersionUID = 1L;
    private final String operatorName;
    private final RowDataWrapper rowDataWrapper;
    private final SortKey sortKey;
    private final OperatorEventGateway operatorEventGateway;
    private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
    private volatile transient DataStatistics<D, S> localStatistics;
    private volatile transient DataStatistics<D, S> globalStatistics;
    private transient ListState<DataStatistics<D, S>> globalStatisticsState;

    DataStatisticsOperator(String operatorName, Schema schema, SortOrder sortOrder, OperatorEventGateway operatorEventGateway, TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
        this.operatorName = operatorName;
        this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), schema.asStruct());
        this.sortKey = new SortKey(schema, sortOrder);
        this.operatorEventGateway = operatorEventGateway;
        this.statisticsSerializer = statisticsSerializer;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        this.localStatistics = (DataStatistics)this.statisticsSerializer.createInstance();
        this.globalStatisticsState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor("globalStatisticsState", this.statisticsSerializer));
        if (context.isRestored()) {
            int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
            if (this.globalStatisticsState.get() == null || !((Iterable)this.globalStatisticsState.get()).iterator().hasNext()) {
                LOG.warn("Operator {} subtask {} doesn't have global statistics state to restore", (Object)this.operatorName, (Object)subtaskIndex);
                this.globalStatistics = (DataStatistics)this.statisticsSerializer.createInstance();
            } else {
                LOG.info("Restoring operator {} global statistics state for subtask {}", (Object)this.operatorName, (Object)subtaskIndex);
                this.globalStatistics = (DataStatistics)((Iterable)this.globalStatisticsState.get()).iterator().next();
            }
        } else {
            this.globalStatistics = (DataStatistics)this.statisticsSerializer.createInstance();
        }
    }

    public void open() throws Exception {
        if (!this.globalStatistics.isEmpty()) {
            this.output.collect((Object)new StreamRecord(DataStatisticsOrRecord.fromDataStatistics(this.globalStatistics)));
        }
    }

    public void handleOperatorEvent(OperatorEvent event) {
        int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
        Preconditions.checkArgument(event instanceof DataStatisticsEvent, String.format("Operator %s subtask %s received unexpected operator event %s", this.operatorName, subtaskIndex, event.getClass()));
        DataStatisticsEvent statisticsEvent = (DataStatisticsEvent)event;
        LOG.info("Operator {} received global data event from coordinator checkpoint {}", (Object)this.operatorName, (Object)statisticsEvent.checkpointId());
        this.globalStatistics = DataStatisticsUtil.deserializeDataStatistics(statisticsEvent.statisticsBytes(), this.statisticsSerializer);
        this.output.collect((Object)new StreamRecord(DataStatisticsOrRecord.fromDataStatistics(this.globalStatistics)));
    }

    public void processElement(StreamRecord<RowData> streamRecord) {
        RowData record = (RowData)streamRecord.getValue();
        RowDataWrapper struct = this.rowDataWrapper.wrap(record);
        this.sortKey.wrap(struct);
        this.localStatistics.add(this.sortKey);
        this.output.collect((Object)new StreamRecord(DataStatisticsOrRecord.fromRecord(record)));
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        long checkpointId = context.getCheckpointId();
        int subTaskId = this.getRuntimeContext().getIndexOfThisSubtask();
        LOG.info("Snapshotting data statistics operator {} for checkpoint {} in subtask {}", new Object[]{this.operatorName, checkpointId, subTaskId});
        if (!this.globalStatistics.isEmpty()) {
            this.output.collect((Object)new StreamRecord(DataStatisticsOrRecord.fromDataStatistics(this.globalStatistics)));
        }
        if (!this.globalStatistics.isEmpty() && this.getRuntimeContext().getIndexOfThisSubtask() == 0) {
            this.globalStatisticsState.clear();
            LOG.info("Saving operator {} global statistics {} to state in subtask {}", new Object[]{this.operatorName, this.globalStatistics, subTaskId});
            this.globalStatisticsState.add(this.globalStatistics);
        }
        this.operatorEventGateway.sendEventToCoordinator(DataStatisticsEvent.create(checkpointId, this.localStatistics, this.statisticsSerializer));
        LOG.debug("Subtask {} of operator {} sent local statistics to coordinator at checkpoint{}: {}", new Object[]{subTaskId, this.operatorName, checkpointId, this.localStatistics});
        this.localStatistics = (DataStatistics)this.statisticsSerializer.createInstance();
    }

    @VisibleForTesting
    DataStatistics<D, S> localDataStatistics() {
        return this.localStatistics;
    }

    @VisibleForTesting
    DataStatistics<D, S> globalDataStatistics() {
        return this.globalStatistics;
    }
}

