/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.block.stream;

import alluxio.client.WriteType;
import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.client.block.stream.DataWriter;
import alluxio.client.block.stream.GrpcBlockingStream;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.CreateLocalBlockRequest;
import alluxio.grpc.CreateLocalBlockResponse;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.resource.CloseableResource;
import alluxio.shaded.client.com.google.common.base.MoreObjects;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.com.google.common.io.Closer;
import alluxio.shaded.client.io.netty.buffer.ByteBuf;
import alluxio.shaded.client.javax.annotation.concurrent.NotThreadSafe;
import alluxio.util.CommonUtils;
import alluxio.wire.WorkerNetAddress;
import alluxio.worker.block.io.LocalFileBlockWriter;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public final class LocalFileDataWriter
implements DataWriter {
    private static final Logger LOG = LoggerFactory.getLogger(LocalFileDataWriter.class);
    private final long mFileBufferBytes;
    private final long mDataTimeoutMs;
    private final LocalFileBlockWriter mWriter;
    private final long mChunkSize;
    private final CreateLocalBlockRequest mCreateRequest;
    private final Closer mCloser;
    private final GrpcBlockingStream<CreateLocalBlockRequest, CreateLocalBlockResponse> mStream;
    private long mPos;
    private long mPosReserved;

    public static LocalFileDataWriter create(FileSystemContext context, WorkerNetAddress address, long blockId, OutStreamOptions options) throws IOException {
        AlluxioConfiguration conf = context.getClusterConf();
        long chunkSize = conf.getBytes(PropertyKey.USER_LOCAL_WRITER_CHUNK_SIZE_BYTES);
        Closer closer = Closer.create();
        try {
            CloseableResource<BlockWorkerClient> blockWorker = context.acquireBlockWorkerClient(address);
            closer.register(blockWorker);
            int writerBufferSizeMessages = conf.getInt(PropertyKey.USER_NETWORK_WRITER_BUFFER_SIZE_MESSAGES);
            long fileBufferBytes = conf.getBytes(PropertyKey.USER_FILE_BUFFER_BYTES);
            long dataTimeout = conf.getMs(PropertyKey.USER_NETWORK_DATA_TIMEOUT_MS);
            CreateLocalBlockRequest.Builder builder = CreateLocalBlockRequest.newBuilder().setBlockId(blockId).setTier(options.getWriteTier()).setSpaceToReserve(fileBufferBytes).setMediumType(options.getMediumType()).setPinOnCreate(options.getWriteType() == WriteType.ASYNC_THROUGH);
            if (options.getWriteType() == WriteType.ASYNC_THROUGH && conf.getBoolean(PropertyKey.USER_FILE_UFS_TIER_ENABLED)) {
                builder.setCleanupOnFailure(false);
            }
            CreateLocalBlockRequest createRequest = builder.build();
            GrpcBlockingStream<CreateLocalBlockRequest, CreateLocalBlockResponse> stream = new GrpcBlockingStream<CreateLocalBlockRequest, CreateLocalBlockResponse>(blockWorker.get()::createLocalBlock, writerBufferSizeMessages, MoreObjects.toStringHelper(LocalFileDataWriter.class).add("request", createRequest).add("address", address).toString());
            stream.send(createRequest, dataTimeout);
            CreateLocalBlockResponse response = (CreateLocalBlockResponse)stream.receive(dataTimeout);
            Preconditions.checkState(response != null && response.hasPath());
            LocalFileBlockWriter writer = closer.register(new LocalFileBlockWriter(response.getPath()));
            return new LocalFileDataWriter(chunkSize, writer, createRequest, stream, closer, fileBufferBytes, dataTimeout);
        }
        catch (Exception e) {
            throw CommonUtils.closeAndRethrow(closer, e);
        }
    }

    @Override
    public long pos() {
        return this.mPos;
    }

    @Override
    public int chunkSize() {
        return (int)this.mChunkSize;
    }

    @Override
    public void writeChunk(ByteBuf buf) throws IOException {
        try {
            Preconditions.checkState(!this.mStream.isCanceled() && !this.mStream.isClosed(), "DataWriter is closed while writing chunks.");
            int sz = buf.readableBytes();
            this.ensureReserved(this.mPos + (long)sz);
            this.mPos += (long)sz;
            Preconditions.checkState(this.mWriter.append(buf) == (long)sz);
            MetricsSystem.counter(MetricKey.CLIENT_BYTES_WRITTEN_LOCAL.getName()).inc(sz);
            MetricsSystem.meter(MetricKey.CLIENT_BYTES_WRITTEN_LOCAL_THROUGHPUT.getName()).mark(sz);
        }
        finally {
            buf.release();
        }
    }

    @Override
    public void cancel() throws IOException {
        this.mCloser.register(() -> this.mStream.cancel());
        this.mCloser.close();
    }

    @Override
    public void flush() {
    }

    @Override
    public void close() throws IOException {
        this.mCloser.register(() -> {
            this.mStream.close();
            this.mStream.waitForComplete(this.mDataTimeoutMs);
        });
        this.mCloser.close();
    }

    private LocalFileDataWriter(long packetSize, LocalFileBlockWriter writer, CreateLocalBlockRequest createRequest, GrpcBlockingStream<CreateLocalBlockRequest, CreateLocalBlockResponse> stream, Closer closer, long fileBufferBytes, long dataTimeoutMs) {
        this.mFileBufferBytes = fileBufferBytes;
        this.mDataTimeoutMs = dataTimeoutMs;
        this.mCloser = closer;
        this.mWriter = writer;
        this.mCreateRequest = createRequest;
        this.mStream = stream;
        this.mPosReserved += this.mFileBufferBytes;
        this.mChunkSize = packetSize;
    }

    private void ensureReserved(long pos) throws IOException {
        if (pos <= this.mPosReserved) {
            return;
        }
        long toReserve = Math.max(pos - this.mPosReserved, this.mFileBufferBytes);
        CreateLocalBlockRequest request = this.mCreateRequest.toBuilder().setSpaceToReserve(toReserve).setOnlyReserveSpace(true).build();
        this.mStream.send(request, this.mDataTimeoutMs);
        CreateLocalBlockResponse response = this.mStream.receive(this.mDataTimeoutMs);
        Preconditions.checkState(response != null, String.format("Stream closed while waiting for reserve request %s", request.toString()));
        Preconditions.checkState(!response.hasPath(), String.format("Invalid response for reserve request %s", request.toString()));
        this.mPosReserved += toReserve;
    }
}

