/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.block.stream;

import alluxio.client.WriteType;
import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.client.block.stream.DataWriter;
import alluxio.client.block.stream.GrpcBlockingStream;
import alluxio.client.block.stream.GrpcDataMessageBlockingStream;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.Chunk;
import alluxio.grpc.DataMessage;
import alluxio.grpc.RequestType;
import alluxio.grpc.WriteRequest;
import alluxio.grpc.WriteRequestCommand;
import alluxio.grpc.WriteRequestMarshaller;
import alluxio.grpc.WriteResponse;
import alluxio.network.protocol.databuffer.NettyDataBuffer;
import alluxio.proto.dataserver.Protocol;
import alluxio.resource.CloseableResource;
import alluxio.shaded.client.com.google.common.base.MoreObjects;
import alluxio.shaded.client.com.google.protobuf.UnsafeByteOperations;
import alluxio.shaded.client.io.netty.buffer.ByteBuf;
import alluxio.shaded.client.javax.annotation.concurrent.NotThreadSafe;
import alluxio.util.proto.ProtoUtils;
import alluxio.wire.WorkerNetAddress;
import java.io.IOException;
import java.util.Optional;

@NotThreadSafe
public final class GrpcDataWriter
implements DataWriter {
    private final long mDataTimeoutMs;
    private final long mWriterCloseTimeoutMs;
    private final long mWriterFlushTimeoutMs;
    private final CloseableResource<BlockWorkerClient> mClient;
    private final WorkerNetAddress mAddress;
    private final WriteRequestCommand mPartialRequest;
    private final long mChunkSize;
    private final GrpcBlockingStream<WriteRequest, WriteResponse> mStream;
    private String mContentHash = null;
    private long mPosToQueue;

    public static GrpcDataWriter create(FileSystemContext context, WorkerNetAddress address, long id, long length, RequestType type, OutStreamOptions options) throws IOException {
        long chunkSize = context.getClusterConf().getBytes(PropertyKey.USER_STREAMING_WRITER_CHUNK_SIZE_BYTES);
        CloseableResource<BlockWorkerClient> grpcClient = context.acquireBlockWorkerClient(address);
        try {
            return new GrpcDataWriter(context, address, id, length, chunkSize, type, options, grpcClient);
        }
        catch (Exception e) {
            grpcClient.close();
            throw e;
        }
    }

    private GrpcDataWriter(FileSystemContext context, WorkerNetAddress address, long id, long length, long chunkSize, RequestType type, OutStreamOptions options, CloseableResource<BlockWorkerClient> client) throws IOException {
        this.mAddress = address;
        AlluxioConfiguration conf = context.getClusterConf();
        this.mDataTimeoutMs = conf.getMs(PropertyKey.USER_STREAMING_DATA_WRITE_TIMEOUT);
        this.mWriterCloseTimeoutMs = conf.getMs(PropertyKey.USER_STREAMING_WRITER_CLOSE_TIMEOUT);
        this.mWriterFlushTimeoutMs = conf.getMs(PropertyKey.USER_STREAMING_WRITER_FLUSH_TIMEOUT);
        long reservedBytes = Math.min(length, conf.getBytes(PropertyKey.USER_FILE_RESERVED_BYTES));
        WriteRequestCommand.Builder builder = WriteRequestCommand.newBuilder().setId(id).setTier(options.getWriteTier()).setType(type).setMediumType(options.getMediumType());
        if (type == RequestType.UFS_FILE) {
            Protocol.CreateUfsFileOptions ufsFileOptions = Protocol.CreateUfsFileOptions.newBuilder().setUfsPath(options.getUfsPath()).setOwner(options.getOwner()).setGroup(options.getGroup()).setMode(options.getMode().toShort()).setMountId(options.getMountId()).setAcl(ProtoUtils.toProto(options.getAcl())).build();
            builder.setCreateUfsFileOptions(ufsFileOptions);
        }
        builder.setPinOnCreate(options.getWriteType() == WriteType.ASYNC_THROUGH);
        builder.setSpaceToReserve(reservedBytes);
        this.mPartialRequest = builder.buildPartial();
        this.mChunkSize = chunkSize;
        this.mClient = client;
        int writerBufferSizeMessages = conf.getInt(PropertyKey.USER_STREAMING_WRITER_BUFFER_SIZE_MESSAGES);
        this.mStream = conf.getBoolean(PropertyKey.USER_STREAMING_ZEROCOPY_ENABLED) ? new GrpcDataMessageBlockingStream<WriteRequest, WriteResponse>(this.mClient.get()::writeBlock, writerBufferSizeMessages, MoreObjects.toStringHelper(this).add("request", this.mPartialRequest).add("address", address).toString(), new WriteRequestMarshaller(), null) : new GrpcBlockingStream(this.mClient.get()::writeBlock, writerBufferSizeMessages, MoreObjects.toStringHelper(this).add("request", this.mPartialRequest).add("address", address).toString());
        this.mStream.send(WriteRequest.newBuilder().setCommand(this.mPartialRequest.toBuilder()).build(), this.mDataTimeoutMs);
    }

    @Override
    public long pos() {
        return this.mPosToQueue;
    }

    @Override
    public Optional<String> getUfsContentHash() {
        return Optional.ofNullable(this.mContentHash);
    }

    @Override
    public void writeChunk(ByteBuf buf) throws IOException {
        this.mPosToQueue += (long)buf.readableBytes();
        try {
            WriteRequest request = WriteRequest.newBuilder().setCommand(this.mPartialRequest).setChunk(Chunk.newBuilder().setData(UnsafeByteOperations.unsafeWrap(buf.nioBuffer())).build()).build();
            if (this.mStream instanceof GrpcDataMessageBlockingStream) {
                ((GrpcDataMessageBlockingStream)this.mStream).sendDataMessage(new DataMessage<WriteRequest, NettyDataBuffer>(request, new NettyDataBuffer(buf)), this.mDataTimeoutMs);
            } else {
                this.mStream.send(request, this.mDataTimeoutMs);
            }
        }
        finally {
            buf.release();
        }
    }

    @Override
    public void cancel() {
        if (this.mClient.get().isShutdown()) {
            return;
        }
        this.mStream.cancel();
    }

    @Override
    public void flush() throws IOException {
        long posWritten;
        if (this.mStream.isClosed() || this.mStream.isCanceled() || this.mPosToQueue == 0L) {
            return;
        }
        WriteRequest writeRequest = WriteRequest.newBuilder().setCommand(this.mPartialRequest.toBuilder().setOffset(this.mPosToQueue).setFlush(true)).build();
        this.mStream.send(writeRequest, this.mDataTimeoutMs);
        do {
            WriteResponse response;
            if ((response = this.mStream.receive(this.mWriterFlushTimeoutMs)) == null) {
                throw new UnavailableException(String.format("Flush request %s to worker %s is not acknowledged before complete.", writeRequest, this.mAddress));
            }
            posWritten = response.getOffset();
            if (!response.hasContentHash()) continue;
            this.mContentHash = response.getContentHash();
        } while (this.mPosToQueue != posWritten);
    }

    @Override
    public void close() throws IOException {
        try {
            if (this.mClient.get().isShutdown()) {
                return;
            }
            this.mStream.close();
            this.mStream.waitForComplete(this.mWriterCloseTimeoutMs).ifPresent(writeResponse -> {
                this.mContentHash = writeResponse.hasContentHash() ? writeResponse.getContentHash() : null;
            });
        }
        finally {
            this.mClient.close();
        }
    }

    @Override
    public int chunkSize() {
        return (int)this.mChunkSize;
    }
}

