/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;
import org.apache.hudi.table.action.commit.BaseWriteHelper;

public class FlinkWriteHelper<T extends HoodieRecordPayload, R>
extends BaseWriteHelper<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, R> {
    private FlinkWriteHelper() {
    }

    public static FlinkWriteHelper newInstance() {
        return WriteHelperHolder.FLINK_WRITE_HELPER;
    }

    public HoodieWriteMetadata<List<WriteStatus>> write(String instantTime, List<HoodieRecord<T>> inputRecords, HoodieEngineContext context, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, boolean shouldCombine, int shuffleParallelism, BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, R> executor, WriteOperationType operationType) {
        try {
            Instant lookupBegin = Instant.now();
            Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
            HoodieWriteMetadata result = executor.execute(inputRecords);
            result.setIndexLookupDuration(indexLookupDuration);
            return result;
        }
        catch (Throwable e) {
            if (e instanceof HoodieUpsertException) {
                throw (HoodieUpsertException)e;
            }
            throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
        }
    }

    protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, HoodieEngineContext context, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table) {
        return table.getIndex().tagLocation((HoodieData)HoodieListData.eager(dedupedRecords), context, table).collectAsList();
    }

    public List<HoodieRecord<T>> deduplicateRecords(List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
        Map<Object, List<HoodieRecord>> keyedRecords = records.stream().collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()));
        return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, rec2) -> {
            HoodieRecordPayload data2;
            HoodieRecordPayload reducedData;
            HoodieRecordPayload data1 = (HoodieRecordPayload)rec1.getData();
            boolean choosePrev = data1 == (reducedData = (data2 = (HoodieRecordPayload)rec2.getData()).preCombine(data1, CollectionUtils.emptyProps()));
            HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
            HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation();
            HoodieAvroRecord hoodieRecord = new HoodieAvroRecord(reducedKey, reducedData, operation);
            hoodieRecord.setCurrentLocation(rec1.getCurrentLocation());
            return hoodieRecord;
        }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
    }

    private static class WriteHelperHolder {
        private static final FlinkWriteHelper FLINK_WRITE_HELPER = new FlinkWriteHelper();

        private WriteHelperHolder() {
        }
    }
}

