/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.block.stream;

import alluxio.annotation.SuppressFBWarnings;
import alluxio.client.block.stream.BufferCachingGrpcDataReader;
import alluxio.client.block.stream.DataReader;
import alluxio.client.file.FileSystemContext;
import alluxio.grpc.ReadRequest;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.NioDataBuffer;
import alluxio.resource.LockResource;
import alluxio.shaded.client.com.google.common.annotations.VisibleForTesting;
import alluxio.shaded.client.com.google.common.hash.HashFunction;
import alluxio.shaded.client.com.google.common.hash.Hashing;
import alluxio.shaded.client.javax.annotation.Nullable;
import alluxio.shaded.client.javax.annotation.concurrent.NotThreadSafe;
import alluxio.wire.WorkerNetAddress;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;

@NotThreadSafe
public class SharedGrpcDataReader
implements DataReader {
    private static final int BLOCK_LOCK_NUM = 32;
    private static final ReentrantReadWriteLock[] BLOCK_LOCKS = new ReentrantReadWriteLock[32];
    private static final ConcurrentHashMap<Long, BufferCachingGrpcDataReader> BLOCK_READERS = new ConcurrentHashMap();
    private static final HashFunction HASH_FUNC = Hashing.murmur3_32_fixed();
    private final long mBlockId;
    private final BufferCachingGrpcDataReader mCachedDataReader;
    private final long mChunkSize;
    private long mPosToRead;

    private static ReentrantReadWriteLock getLock(long blockId) {
        return BLOCK_LOCKS[Math.floorMod(HASH_FUNC.hashLong(blockId).asInt(), BLOCK_LOCKS.length)];
    }

    @VisibleForTesting
    protected SharedGrpcDataReader(ReadRequest readRequest, BufferCachingGrpcDataReader reader) {
        this.mChunkSize = readRequest.getChunkSize();
        this.mPosToRead = readRequest.getOffset();
        this.mBlockId = readRequest.getBlockId();
        this.mCachedDataReader = reader;
    }

    @Override
    public long pos() {
        return this.mPosToRead;
    }

    public void seek(long pos) {
        this.mPosToRead = pos;
    }

    @Override
    @Nullable
    public DataBuffer readChunk() throws IOException {
        int index = (int)(this.mPosToRead / this.mChunkSize);
        DataBuffer chunk = this.mCachedDataReader.readChunk(index);
        if (chunk == null) {
            return null;
        }
        ByteBuffer bb = chunk.getReadOnlyByteBuffer();
        bb.position((int)(this.mPosToRead % this.mChunkSize));
        this.mPosToRead += this.mChunkSize - this.mPosToRead % this.mChunkSize;
        return new NioDataBuffer(bb, bb.remaining());
    }

    @Override
    public void close() throws IOException {
        if (this.mCachedDataReader.deRef() > 0) {
            return;
        }
        try (LockResource ignored = new LockResource(SharedGrpcDataReader.getLock(this.mBlockId).writeLock());){
            if (this.mCachedDataReader.getRefCount() == 0) {
                BLOCK_READERS.remove(this.mBlockId);
                this.mCachedDataReader.close();
            }
        }
    }

    static {
        for (int i = 0; i < 32; ++i) {
            SharedGrpcDataReader.BLOCK_LOCKS[i] = new ReentrantReadWriteLock();
        }
    }

    public static class Factory
    implements DataReader.Factory {
        private final FileSystemContext mContext;
        private final WorkerNetAddress mAddress;
        private final ReadRequest.Builder mReadRequestBuilder;
        private final long mBlockSize;

        public Factory(FileSystemContext context, WorkerNetAddress address, ReadRequest.Builder readRequestBuilder, long blockSize) {
            this.mContext = context;
            this.mAddress = address;
            this.mReadRequestBuilder = readRequestBuilder;
            this.mBlockSize = blockSize;
        }

        @Override
        @SuppressFBWarnings(value={"AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"}, justification="operation is still atomic guarded by block lock")
        public DataReader create(long offset, long len) throws IOException {
            BufferCachingGrpcDataReader reader;
            long blockId = this.mReadRequestBuilder.getBlockId();
            try (LockResource ignored = new LockResource(SharedGrpcDataReader.getLock(blockId).writeLock());){
                reader = (BufferCachingGrpcDataReader)BLOCK_READERS.get(blockId);
                if (reader == null) {
                    ReadRequest cacheRequest = this.mReadRequestBuilder.setOffset(0L).setLength(this.mBlockSize).build();
                    reader = BufferCachingGrpcDataReader.create(this.mContext, this.mAddress, cacheRequest);
                    BLOCK_READERS.put(blockId, reader);
                }
                reader.ref();
            }
            return new SharedGrpcDataReader(this.mReadRequestBuilder.setOffset(offset).setLength(len).build(), reader);
        }

        @Override
        public void close() throws IOException {
        }
    }
}

