/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.bulk;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.FutureUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.sink.bulk.RowDataKeyGens;
import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BulkInsertWriterHelper {
    private static final Logger LOG = LoggerFactory.getLogger(BulkInsertWriterHelper.class);
    protected final String instantTime;
    protected final int taskPartitionId;
    protected final long totalSubtaskNum;
    protected final long taskEpochId;
    protected final HoodieTable hoodieTable;
    protected final HoodieWriteConfig writeConfig;
    protected final RowType rowType;
    protected final boolean preserveHoodieMetadata;
    protected final boolean isAppendMode;
    protected final boolean populateMetaFields;
    protected final Boolean isInputSorted;
    private final List<WriteStatus> writeStatusList = new ArrayList<WriteStatus>();
    protected HoodieRowDataCreateHandle handle;
    private String lastKnownPartitionPath = null;
    private final String fileIdPrefix;
    private int numFilesWritten = 0;
    protected final Map<String, HoodieRowDataCreateHandle> handles = new HashMap<String, HoodieRowDataCreateHandle>();
    @Nullable
    protected final RowDataKeyGen keyGen;
    protected final Option<FlinkStreamWriteMetrics> writeMetrics;

    public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, HoodieWriteConfig writeConfig, String instantTime, int taskPartitionId, long totalSubtaskNum, long taskEpochId, RowType rowType) {
        this(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, false, Option.empty());
    }

    public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, HoodieWriteConfig writeConfig, String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType, boolean preserveHoodieMetadata) {
        this(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType, preserveHoodieMetadata, Option.empty());
    }

    public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, HoodieWriteConfig writeConfig, String instantTime, int taskPartitionId, long totalSubtaskNum, long taskEpochId, RowType rowType, boolean preserveHoodieMetadata, Option<FlinkStreamWriteMetrics> writeMetrics) {
        this.hoodieTable = hoodieTable;
        this.writeConfig = writeConfig;
        this.instantTime = instantTime;
        this.taskPartitionId = taskPartitionId;
        this.totalSubtaskNum = totalSubtaskNum;
        this.taskEpochId = taskEpochId;
        this.isAppendMode = OptionsResolver.isAppendMode(conf);
        this.populateMetaFields = writeConfig.populateMetaFields();
        this.rowType = preserveHoodieMetadata || this.isAppendMode && !this.populateMetaFields ? rowType : BulkInsertWriterHelper.addMetadataFields(rowType, writeConfig.allowOperationMetadataField());
        this.preserveHoodieMetadata = preserveHoodieMetadata;
        this.isInputSorted = OptionsResolver.isBulkInsertOperation(conf) && conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
        this.fileIdPrefix = UUID.randomUUID().toString();
        this.keyGen = preserveHoodieMetadata ? null : RowDataKeyGens.instance(conf, rowType, taskPartitionId, instantTime);
        this.writeMetrics = writeMetrics;
    }

    public String getInstantTime() {
        return this.instantTime;
    }

    public void write(RowData record) throws IOException {
        try {
            String partitionPath;
            String recordKey = this.preserveHoodieMetadata ? record.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD).toString() : this.keyGen.getRecordKey(record);
            String string = partitionPath = this.preserveHoodieMetadata ? record.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD).toString() : this.keyGen.getPartitionPath(record);
            if (this.lastKnownPartitionPath == null || !this.lastKnownPartitionPath.equals(partitionPath) || !this.handle.canWrite()) {
                this.handle = this.getRowCreateHandle(partitionPath);
                this.lastKnownPartitionPath = partitionPath;
                this.writeMetrics.ifPresent(FlinkStreamWriteMetrics::markHandleSwitch);
            }
            this.handle.write(recordKey, partitionPath, record);
            this.writeMetrics.ifPresent(FlinkStreamWriteMetrics::markRecordIn);
        }
        catch (Throwable t) {
            IOException ioException = new IOException("Exception happened when bulk insert.", t);
            LOG.error("Global error thrown while trying to write records in HoodieRowCreateHandle ", (Throwable)ioException);
            throw new IOException(ioException);
        }
    }

    private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath) throws IOException {
        if (!this.handles.containsKey(partitionPath)) {
            if (this.isInputSorted.booleanValue()) {
                this.close();
            }
            LOG.info("Creating new file for partition path " + partitionPath);
            this.writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
            HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(this.hoodieTable, this.writeConfig, partitionPath, this.getNextFileId(), this.instantTime, this.taskPartitionId, this.totalSubtaskNum, this.taskEpochId, this.rowType, this.preserveHoodieMetadata, this.isAppendMode && !this.populateMetaFields);
            this.handles.put(partitionPath, rowCreateHandle);
            this.writeMetrics.ifPresent(FlinkStreamWriteMetrics::increaseNumOfOpenHandle);
        } else if (!this.handles.get(partitionPath).canWrite()) {
            LOG.info("Rolling max-size file for partition path " + partitionPath);
            this.writeStatusList.add(this.closeWriteHandle(this.handles.remove(partitionPath)));
            HoodieRowDataCreateHandle rowCreateHandle = this.createWriteHandle(partitionPath);
            this.handles.put(partitionPath, rowCreateHandle);
            this.writeMetrics.ifPresent(FlinkStreamWriteMetrics::increaseNumOfFilesWritten);
        }
        return this.handles.get(partitionPath);
    }

    public void close() throws IOException {
        if (this.handles.isEmpty()) {
            return;
        }
        int handsSize = Math.min(this.handles.size(), 10);
        ExecutorService executorService = Executors.newFixedThreadPool(handsSize);
        ((CompletableFuture)FutureUtils.allOf(this.handles.values().stream().map(rowCreateHandle -> CompletableFuture.supplyAsync(() -> {
            try {
                LOG.info("Closing bulk insert file " + rowCreateHandle.getFileName());
                return rowCreateHandle.close();
            }
            catch (IOException e) {
                throw new HoodieIOException("IOE during rowCreateHandle.close()", e);
            }
        }, executorService)).collect(Collectors.toList())).whenComplete((result, throwable) -> this.writeStatusList.addAll((Collection<WriteStatus>)result))).join();
        try {
            executorService.shutdown();
            executorService.awaitTermination(10L, TimeUnit.MINUTES);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        this.handles.clear();
        this.handle = null;
    }

    private String getNextFileId() {
        return String.format("%s-%d", this.fileIdPrefix, this.numFilesWritten++);
    }

    public static RowType addMetadataFields(RowType rowType, boolean withOperationField) {
        ArrayList<RowType.RowField> mergedFields = new ArrayList<RowType.RowField>();
        LogicalType metadataFieldType = DataTypes.STRING().getLogicalType();
        RowType.RowField commitTimeField = new RowType.RowField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, metadataFieldType, "commit time");
        RowType.RowField commitSeqnoField = new RowType.RowField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, metadataFieldType, "commit seqno");
        RowType.RowField recordKeyField = new RowType.RowField(HoodieRecord.RECORD_KEY_METADATA_FIELD, metadataFieldType, "record key");
        RowType.RowField partitionPathField = new RowType.RowField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, metadataFieldType, "partition path");
        RowType.RowField fileNameField = new RowType.RowField(HoodieRecord.FILENAME_METADATA_FIELD, metadataFieldType, "field name");
        mergedFields.add(commitTimeField);
        mergedFields.add(commitSeqnoField);
        mergedFields.add(recordKeyField);
        mergedFields.add(partitionPathField);
        mergedFields.add(fileNameField);
        if (withOperationField) {
            RowType.RowField operationField = new RowType.RowField(HoodieRecord.OPERATION_METADATA_FIELD, metadataFieldType, "operation");
            mergedFields.add(operationField);
        }
        mergedFields.addAll(rowType.getFields());
        return new RowType(false, mergedFields);
    }

    public List<WriteStatus> getWriteStatuses(int taskID) {
        try {
            this.close();
            return this.writeStatusList;
        }
        catch (IOException e) {
            throw new HoodieException("Error collect the write status for task [" + taskID + "]", e);
        }
    }

    private HoodieRowDataCreateHandle createWriteHandle(String partitionPath) {
        this.writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
        HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(this.hoodieTable, this.writeConfig, partitionPath, this.getNextFileId(), this.instantTime, this.taskPartitionId, this.totalSubtaskNum, this.taskEpochId, this.rowType, this.preserveHoodieMetadata, this.isAppendMode && !this.populateMetaFields);
        this.writeMetrics.ifPresent(FlinkStreamWriteMetrics::endHandleCreation);
        return rowCreateHandle;
    }

    private WriteStatus closeWriteHandle(HoodieRowDataCreateHandle rowCreateHandle) throws IOException {
        this.writeMetrics.ifPresent(FlinkStreamWriteMetrics::startFileFlush);
        WriteStatus status = rowCreateHandle.close();
        this.writeMetrics.ifPresent(FlinkStreamWriteMetrics::endFileFlush);
        return status;
    }
}

