/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.file.sink;

import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractWriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;

public class BaseFileSinkWriter
implements SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> {
    protected final WriteStrategy writeStrategy;
    private final FileSystemUtils fileSystemUtils;

    public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf hadoopConf, SinkWriter.Context context, String jobId, List<FileSinkState> fileSinkStates) {
        this.writeStrategy = writeStrategy;
        this.fileSystemUtils = writeStrategy.getFileSystemUtils();
        int subTaskIndex = context.getIndexOfSubtask();
        String uuidPrefix = !fileSinkStates.isEmpty() ? fileSinkStates.get(0).getUuidPrefix() : UUID.randomUUID().toString().replaceAll("-", "").substring(0, 10);
        writeStrategy.init(hadoopConf, jobId, uuidPrefix, subTaskIndex);
        if (!fileSinkStates.isEmpty()) {
            try {
                List<String> transactions = this.findTransactionList(jobId, uuidPrefix);
                FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new FileSinkAggregatedCommitter(this.fileSystemUtils);
                LinkedHashMap fileStatesMap = new LinkedHashMap();
                fileSinkStates.forEach(fileSinkState -> fileStatesMap.put(fileSinkState.getTransactionId(), fileSinkState));
                for (String transaction : transactions) {
                    if (fileStatesMap.containsKey(transaction)) {
                        FileSinkState fileSinkState2 = (FileSinkState)fileStatesMap.get(transaction);
                        Object fileCommitInfo = fileSinkAggregatedCommitter.combine((List)Collections.singletonList(new FileCommitInfo(fileSinkState2.getNeedMoveFiles(), fileSinkState2.getPartitionDirAndValuesMap(), fileSinkState2.getTransactionDir())));
                        fileSinkAggregatedCommitter.commit(Collections.singletonList(fileCommitInfo));
                        continue;
                    }
                    writeStrategy.abortPrepare(transaction);
                }
            }
            catch (IOException e) {
                String errorMsg = String.format("Try to process these fileStates %s failed", fileSinkStates);
                throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCode.FILE_OPERATION_FAILED, errorMsg, e);
            }
            writeStrategy.beginTransaction(fileSinkStates.get(0).getCheckpointId() + 1L);
        } else {
            writeStrategy.beginTransaction(1L);
        }
    }

    private List<String> findTransactionList(String jobId, String uuidPrefix) throws IOException {
        return this.fileSystemUtils.dirList(AbstractWriteStrategy.getTransactionDirPrefix(this.writeStrategy.getFileSinkConfig().getTmpPath(), jobId, uuidPrefix)).stream().map(Path::getName).collect(Collectors.toList());
    }

    public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf hadoopConf, SinkWriter.Context context, String jobId) {
        this(writeStrategy, hadoopConf, context, jobId, Collections.emptyList());
        writeStrategy.beginTransaction(1L);
    }

    public void write(SeaTunnelRow element) throws IOException {
        try {
            this.writeStrategy.write(element);
        }
        catch (FileConnectorException e) {
            String errorMsg = String.format("Write this data [%s] to file failed", element);
            throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCode.FILE_OPERATION_FAILED, errorMsg, (Throwable)((Object)e));
        }
    }

    public Optional<FileCommitInfo> prepareCommit() throws IOException {
        return this.writeStrategy.prepareCommit();
    }

    public void abortPrepare() {
        this.writeStrategy.abortPrepare();
    }

    public List<FileSinkState> snapshotState(long checkpointId) throws IOException {
        return this.writeStrategy.snapshotState(checkpointId);
    }

    public void close() throws IOException {
    }
}

