/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.client.impl;

import java.io.IOException;
import java.io.OutputStream;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.RaftClientImpl;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.SizeInBytes;

public class RaftOutputStream
extends OutputStream {
    private final Supplier<RaftClient> client;
    private final AtomicBoolean closed = new AtomicBoolean();
    private final Queue<CompletableFuture<Long>> flushFutures = new LinkedList<CompletableFuture<Long>>();
    private final byte[] buffer;
    private int byteCount;
    private long byteFlushed;

    public RaftOutputStream(Supplier<RaftClient> clientSupplier, SizeInBytes bufferSize) {
        this.client = JavaUtils.memoize(clientSupplier);
        this.buffer = new byte[bufferSize.getSizeInt()];
    }

    private RaftClient getClient() {
        return this.client.get();
    }

    @Override
    public void write(int b) throws IOException {
        this.checkClosed();
        this.buffer[this.byteCount++] = (byte)b;
        this.flushIfNecessary();
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        int toWrite;
        this.checkClosed();
        if (off < 0 || len < 0 || off > b.length - len) {
            throw new ArrayIndexOutOfBoundsException();
        }
        for (int total = 0; total < len; total += toWrite) {
            toWrite = Math.min(len - total, this.buffer.length - this.byteCount);
            System.arraycopy(b, off + total, this.buffer, this.byteCount, toWrite);
            this.byteCount += toWrite;
            this.flushIfNecessary();
        }
    }

    private void flushIfNecessary() {
        if (this.byteCount == this.buffer.length) {
            this.flushAsync();
        }
    }

    private void flushAsync() {
        long pos = this.byteFlushed;
        if (this.byteCount == 0) {
            return;
        }
        CompletionStage f = ((CompletableFuture)this.getClient().sendAsync(Message.valueOf((ByteString)ProtoUtils.toByteString((byte[])this.buffer, (int)0, (int)this.byteCount))).thenApply(reply -> RaftClientImpl.handleRaftException(reply, CompletionException::new))).thenApply(reply -> reply != null && reply.isSuccess() ? Long.valueOf(pos) : null);
        this.flushFutures.offer((CompletableFuture<Long>)f);
        this.byteFlushed += (long)this.byteCount;
        this.byteCount = 0;
    }

    private void flushImpl() throws IOException {
        long pos = this.byteFlushed;
        this.flushAsync();
        while (!this.flushFutures.isEmpty()) {
            Long flushed = this.flushFutures.poll().join();
            if (flushed != null) continue;
            throw new IOException("Failed to flush at position " + pos);
        }
    }

    @Override
    public void flush() throws IOException {
        this.checkClosed();
        this.flushImpl();
    }

    private void checkClosed() throws IOException {
        if (this.closed.get()) {
            throw new IOException(this + " was closed.");
        }
    }

    @Override
    public void close() throws IOException {
        if (this.closed.compareAndSet(false, true)) {
            this.flushImpl();
            this.getClient().close();
        }
    }

    public String toString() {
        return this.getClass().getSimpleName() + "-" + this.getClient().getId() + ":byteFlushed=" + this.byteFlushed;
    }
}

