/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.append;

import java.util.Collections;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AppendWriteFunction<I>
extends AbstractStreamWriteFunction<I> {
    private static final Logger LOG = LoggerFactory.getLogger(AppendWriteFunction.class);
    private static final long serialVersionUID = 1L;
    private transient BulkInsertWriterHelper writerHelper;
    private final RowType rowType;

    public AppendWriteFunction(Configuration config, RowType rowType) {
        super(config);
        this.rowType = rowType;
    }

    @Override
    public void snapshotState() {
        this.flushData(false);
    }

    public void processElement(I value, ProcessFunction.Context ctx, Collector<Object> out) throws Exception {
        if (this.writerHelper == null) {
            this.initWriterHelper();
        }
        this.writerHelper.write((RowData)value);
    }

    @Override
    public void endInput() {
        super.endInput();
        this.flushData(true);
        this.writeStatuses.clear();
    }

    @Override
    protected void sendBootstrapEvent() {
        int attemptId = this.getRuntimeContext().getAttemptNumber();
        if (attemptId > 0 && this.currentInstant != null && !this.metaClient.getActiveTimeline().filterCompletedInstants().containsInstant(this.currentInstant)) {
            LOG.info("Recover task[{}] for instant [{}] with attemptId [{}]", new Object[]{this.taskID, this.currentInstant, attemptId});
            this.currentInstant = null;
            return;
        }
        this.eventGateway.sendEventToCoordinator((OperatorEvent)WriteMetadataEvent.emptyBootstrap(this.taskID));
        LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", (Object)this.taskID);
    }

    @VisibleForTesting
    public BulkInsertWriterHelper getWriterHelper() {
        return this.writerHelper;
    }

    private void initWriterHelper() {
        String instant = this.instantToWrite(true);
        if (instant == null) {
            throw new HoodieException("No inflight instant when flushing data!");
        }
        this.writerHelper = new BulkInsertWriterHelper(this.config, (HoodieTable)this.writeClient.getHoodieTable(), this.writeClient.getConfig(), instant, this.taskID, this.getRuntimeContext().getNumberOfParallelSubtasks(), this.getRuntimeContext().getAttemptNumber(), this.rowType);
    }

    private void flushData(boolean endInput) {
        List<WriteStatus> writeStatus;
        if (this.writerHelper != null) {
            writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
            this.currentInstant = this.writerHelper.getInstantTime();
        } else {
            writeStatus = Collections.emptyList();
            this.currentInstant = this.instantToWrite(false);
            LOG.info("No data to write in subtask [{}] for instant [{}]", (Object)this.taskID, (Object)this.currentInstant);
        }
        WriteMetadataEvent event = WriteMetadataEvent.builder().taskID(this.taskID).instantTime(this.currentInstant).writeStatus(writeStatus).lastBatch(true).endInput(endInput).build();
        this.eventGateway.sendEventToCoordinator((OperatorEvent)event);
        this.writerHelper = null;
        this.writeStatuses.addAll(writeStatus);
        this.confirming = true;
    }
}

