/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.consumer;

import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Optional;
import java.util.OptionalLong;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;

public class ConsumerManager
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final String CONSUMER_PREFIX = "consumer-";
    private final FileIO fileIO;
    private final Path tablePath;

    public ConsumerManager(FileIO fileIO, Path tablePath) {
        this.fileIO = fileIO;
        this.tablePath = tablePath;
    }

    public Optional<Consumer> consumer(String consumerId) {
        return Consumer.fromPath(this.fileIO, this.consumerPath(consumerId));
    }

    public void recordConsumer(String consumerId, Consumer consumer) {
        try (PositionOutputStream out = this.fileIO.newOutputStream(this.consumerPath(consumerId), true);){
            OutputStreamWriter writer = new OutputStreamWriter((OutputStream)out, StandardCharsets.UTF_8);
            writer.write(consumer.toJson());
            writer.flush();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public OptionalLong minNextSnapshot() {
        try {
            Path directory = this.consumerDirectory();
            if (!this.fileIO.exists(directory)) {
                return OptionalLong.empty();
            }
            FileStatus[] statuses = this.fileIO.listStatus(directory);
            if (statuses == null) {
                throw new RuntimeException(String.format("The return value is null of the listStatus for the '%s' directory.", directory));
            }
            return Arrays.stream(statuses).map(FileStatus::getPath).filter(path -> path.getName().startsWith(CONSUMER_PREFIX)).map(path -> Consumer.fromPath(this.fileIO, path)).filter(Optional::isPresent).map(Optional::get).mapToLong(Consumer::nextSnapshot).reduce(Math::min);
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private Path consumerDirectory() {
        return new Path(this.tablePath + "/consumer");
    }

    private Path consumerPath(String consumerId) {
        return new Path(this.tablePath + "/consumer/" + CONSUMER_PREFIX + consumerId);
    }
}

