/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.bucket;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.clustering.update.strategy.FlinkConsistentBucketUpdateStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsistentBucketStreamWriteFunction<I>
extends StreamWriteFunction<I> {
    private static final Logger LOG = LoggerFactory.getLogger(ConsistentBucketStreamWriteFunction.class);
    private transient FlinkConsistentBucketUpdateStrategy updateStrategy;

    public ConsistentBucketStreamWriteFunction(Configuration config) {
        super(config);
    }

    @Override
    public void open(Configuration parameters) throws IOException {
        super.open(parameters);
        List<String> indexKeyFields = Arrays.asList(this.config.getString(FlinkOptions.INDEX_KEY_FIELD).split(","));
        this.updateStrategy = new FlinkConsistentBucketUpdateStrategy(this.writeClient, indexKeyFields);
    }

    @Override
    public void snapshotState() {
        super.snapshotState();
        this.updateStrategy.reset();
    }

    @Override
    protected List<WriteStatus> writeBucket(String instant, StreamWriteFunction.DataBucket bucket, List<HoodieRecord> records) {
        this.updateStrategy.initialize(this.writeClient);
        bucket.preWrite(records);
        Pair<List<Pair<List<HoodieRecord>, String>>, Set<HoodieFileGroupId>> recordListFgPair = this.updateStrategy.handleUpdate(Collections.singletonList(Pair.of(records, instant)));
        return recordListFgPair.getKey().stream().flatMap(recordsInstantPair -> ((List)this.writeFunction.apply(recordsInstantPair.getLeft(), recordsInstantPair.getRight())).stream()).collect(Collectors.toList());
    }
}

