/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.translation.spark.sink.writer;

import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.serialization.RowConverter;
import org.apache.seatunnel.translation.spark.serialization.InternalRowConverter;
import org.apache.seatunnel.translation.spark.sink.writer.SparkWriterCommitMessage;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;

public class SparkDataWriter<CommitInfoT, StateT>
implements DataWriter<InternalRow> {
    private final SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter;
    @Nullable
    private final SinkCommitter<CommitInfoT> sinkCommitter;
    private final RowConverter<InternalRow> rowConverter;
    private CommitInfoT latestCommitInfoT;
    private long epochId;

    SparkDataWriter(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter, @Nullable SinkCommitter<CommitInfoT> sinkCommitter, SeaTunnelDataType<?> dataType, long epochId) {
        this.sinkWriter = sinkWriter;
        this.sinkCommitter = sinkCommitter;
        this.rowConverter = new InternalRowConverter(dataType);
        this.epochId = epochId == 0L ? 1L : epochId;
    }

    public void write(InternalRow record) throws IOException {
        this.sinkWriter.write((Object)this.rowConverter.reconvert((Object)record));
    }

    public WriterCommitMessage commit() throws IOException {
        Optional commitInfoTOptional = this.sinkWriter.prepareCommit();
        commitInfoTOptional.ifPresent(commitInfoT -> {
            this.latestCommitInfoT = commitInfoT;
        });
        this.sinkWriter.snapshotState(this.epochId++);
        if (this.sinkCommitter != null) {
            if (this.latestCommitInfoT == null) {
                this.sinkCommitter.commit(Collections.emptyList());
            } else {
                this.sinkCommitter.commit(Collections.singletonList(this.latestCommitInfoT));
            }
        }
        SparkWriterCommitMessage<CommitInfoT> sparkWriterCommitMessage = new SparkWriterCommitMessage<CommitInfoT>(this.latestCommitInfoT);
        this.cleanCommitInfo();
        this.sinkWriter.close();
        return sparkWriterCommitMessage;
    }

    public void abort() throws IOException {
        this.sinkWriter.abortPrepare();
        if (this.sinkCommitter != null) {
            if (this.latestCommitInfoT == null) {
                this.sinkCommitter.abort(Collections.emptyList());
            } else {
                this.sinkCommitter.abort(Collections.singletonList(this.latestCommitInfoT));
            }
        }
        this.cleanCommitInfo();
    }

    private void cleanCommitInfo() {
        this.latestCommitInfoT = null;
    }
}

