/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ozone.client.io;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
import org.apache.hadoop.ozone.client.io.ECBlockOutputStreamEntry;
import org.apache.hadoop.ozone.client.io.ECBlockOutputStreamEntryPool;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ECKeyOutputStream
extends KeyOutputStream {
    private OzoneClientConfig config;
    private ECChunkBuffers ecChunkBufferCache;
    private int ecChunkSize;
    private final int numDataBlks;
    private final int numParityBlks;
    private final ByteBufferPool bufferPool;
    private final RawErasureEncoder encoder;
    public static final Logger LOG = LoggerFactory.getLogger(KeyOutputStream.class);
    private boolean closed;
    private long offset;
    private long writeOffset;
    private final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool;

    @Override
    @VisibleForTesting
    public List<BlockOutputStreamEntry> getStreamEntries() {
        return this.blockOutputStreamEntryPool.getStreamEntries();
    }

    @Override
    @VisibleForTesting
    public XceiverClientFactory getXceiverClientFactory() {
        return this.blockOutputStreamEntryPool.getXceiverClientFactory();
    }

    @Override
    @VisibleForTesting
    public List<OmKeyLocationInfo> getLocationInfoList() {
        return this.blockOutputStreamEntryPool.getLocationInfoList();
    }

    private ECKeyOutputStream(Builder builder) {
        super(builder.getClientMetrics());
        this.config = builder.getClientConfig();
        this.bufferPool = builder.getByteBufferPool();
        this.ecChunkSize = builder.getReplicationConfig().getEcChunkSize();
        this.config.setStreamBufferMaxSize((long)this.ecChunkSize);
        this.config.setStreamBufferFlushSize((long)this.ecChunkSize);
        this.config.setStreamBufferSize(this.ecChunkSize);
        this.numDataBlks = builder.getReplicationConfig().getData();
        this.numParityBlks = builder.getReplicationConfig().getParity();
        this.ecChunkBufferCache = new ECChunkBuffers(this.ecChunkSize, this.numDataBlks, this.numParityBlks, this.bufferPool);
        OmKeyInfo info = builder.getOpenHandler().getKeyInfo();
        this.blockOutputStreamEntryPool = new ECBlockOutputStreamEntryPool(this.config, builder.getOmClient(), builder.getRequestID(), (ReplicationConfig)builder.getReplicationConfig(), builder.getMultipartUploadID(), builder.getMultipartNumber(), builder.isMultipartKey(), info, builder.isUnsafeByteBufferConversionEnabled(), builder.getXceiverManager(), builder.getOpenHandler().getId(), builder.getClientMetrics());
        this.writeOffset = 0L;
        this.encoder = CodecUtil.createRawEncoderWithFallback((ECReplicationConfig)builder.getReplicationConfig());
    }

    @Override
    public void addPreallocateBlocks(OmKeyLocationInfoGroup version, long openVersion) throws IOException {
        this.blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        this.checkNotClosed();
        if (b == null) {
            throw new NullPointerException();
        }
        if (off < 0 || off > b.length || len < 0 || off + len > b.length || off + len < 0) {
            throw new IndexOutOfBoundsException();
        }
        try {
            for (int writtenLen = 0; writtenLen < len; writtenLen += this.handleWrite(b, off + writtenLen, len - writtenLen)) {
            }
        }
        catch (Exception e) {
            this.markStreamClosed();
            throw new IOException(e.getMessage());
        }
        this.writeOffset += (long)len;
    }

    private StripeWriteStatus rewriteStripeToNewBlockGroup() throws IOException {
        ByteBuffer[] dataBuffers = this.ecChunkBufferCache.getDataBuffers();
        this.offset -= (long)Arrays.stream(dataBuffers).mapToInt(Buffer::limit).sum();
        ECBlockOutputStreamEntry failedStreamEntry = this.blockOutputStreamEntryPool.getCurrentStreamEntry();
        failedStreamEntry.resetToFirstEntry();
        failedStreamEntry.resetToAckedPosition();
        this.blockOutputStreamEntryPool.discardPreallocatedBlocks(-1L, failedStreamEntry.getPipeline().getId());
        failedStreamEntry.close();
        this.blockOutputStreamEntryPool.allocateBlockIfNeeded();
        ECBlockOutputStreamEntry currentStreamEntry = this.blockOutputStreamEntryPool.getCurrentStreamEntry();
        for (int i = 0; i < this.numDataBlks; ++i) {
            if (dataBuffers[i].limit() > 0) {
                this.handleOutputStreamWrite(i, dataBuffers[i].limit(), false);
            }
            currentStreamEntry.useNextBlockStream();
        }
        return this.handleParityWrites();
    }

    private void encodeAndWriteParityCells() throws IOException {
        this.generateParityCells();
        if (this.handleParityWrites() == StripeWriteStatus.FAILED) {
            this.retryStripeWrite(this.config.getMaxECStripeWriteRetries());
        }
    }

    private void logStreamError(List<ECBlockOutputStream> failedStreams, String operation) {
        Set failedStreamIndexSet = failedStreams.stream().map(BlockOutputStream::getReplicationIndex).collect(Collectors.toSet());
        String failedStreamsString = IntStream.range(1, this.numDataBlks + this.numParityBlks + 1).mapToObj(index -> failedStreamIndexSet.contains(index) ? "F" : "S").collect(Collectors.joining(" "));
        LOG.warn("{} failed: {}", (Object)operation, (Object)failedStreamsString);
        for (ECBlockOutputStream stream : failedStreams) {
            LOG.warn("Failure for replica index: {}, DatanodeDetails: {}", new Object[]{stream.getReplicationIndex(), stream.getDatanodeDetails(), stream.getIoException()});
        }
    }

    private StripeWriteStatus handleParityWrites() throws IOException {
        this.writeParityCells();
        ECBlockOutputStreamEntry streamEntry = this.blockOutputStreamEntryPool.getCurrentStreamEntry();
        List<ECBlockOutputStream> failedStreams = streamEntry.streamsWithWriteFailure();
        if (!failedStreams.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                this.logStreamError(failedStreams, "EC stripe write");
            }
            this.excludePipelineAndFailedDN(streamEntry.getPipeline(), failedStreams);
            return StripeWriteStatus.FAILED;
        }
        boolean isLastStripe = streamEntry.getRemaining() <= 0L || this.ecChunkBufferCache.getLastDataCell().limit() < this.ecChunkSize;
        streamEntry.executePutBlock(isLastStripe, streamEntry.getCurrentPosition());
        failedStreams = streamEntry.streamsWithPutBlockFailure();
        if (!failedStreams.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                this.logStreamError(failedStreams, "Put block");
            }
            this.excludePipelineAndFailedDN(streamEntry.getPipeline(), failedStreams);
            return StripeWriteStatus.FAILED;
        }
        streamEntry.updateBlockGroupToAckedPosition(streamEntry.getCurrentPosition());
        this.ecChunkBufferCache.clear();
        if (streamEntry.getRemaining() <= 0L) {
            streamEntry.close();
        } else {
            streamEntry.resetToFirstEntry();
        }
        return StripeWriteStatus.SUCCESS;
    }

    private void excludePipelineAndFailedDN(Pipeline pipeline, List<ECBlockOutputStream> failedStreams) {
        this.blockOutputStreamEntryPool.getExcludeList().addPipeline(pipeline.getId());
        failedStreams.stream().filter(s -> !this.checkIfContainerToExclude(HddsClientUtils.checkForException((Exception)s.getIoException()))).forEach(s -> this.blockOutputStreamEntryPool.getExcludeList().addDatanode(s.getDatanodeDetails()));
    }

    @Override
    protected boolean checkIfContainerToExclude(Throwable t) {
        return super.checkIfContainerToExclude(t) && t instanceof ContainerNotOpenException;
    }

    private void generateParityCells() throws IOException {
        int i;
        ByteBuffer[] dataBuffers = this.ecChunkBufferCache.getDataBuffers();
        ByteBuffer[] parityBuffers = this.ecChunkBufferCache.getParityBuffers();
        int parityCellSize = dataBuffers[0].position();
        int firstNonFullIndex = dataBuffers.length;
        int firstNonFullLength = 0;
        for (i = 0; i < dataBuffers.length; ++i) {
            if (dataBuffers[i].position() == this.ecChunkSize) continue;
            firstNonFullIndex = i;
            firstNonFullLength = dataBuffers[i].position();
            break;
        }
        for (i = firstNonFullIndex + 1; i < dataBuffers.length; ++i) {
            Preconditions.checkState((dataBuffers[i].position() == 0 ? 1 : 0) != 0, (String)"Illegal stripe state: cell {} is not full while cell {} has data", (int)firstNonFullIndex, (int)i);
        }
        for (i = firstNonFullIndex; i < dataBuffers.length; ++i) {
            ECKeyOutputStream.padBufferToLimit(dataBuffers[i], parityCellSize);
        }
        for (ByteBuffer b : parityBuffers) {
            b.limit(parityCellSize);
        }
        for (ByteBuffer b : dataBuffers) {
            b.flip();
        }
        this.encoder.encode(dataBuffers, parityBuffers);
        if (firstNonFullIndex < dataBuffers.length) {
            dataBuffers[firstNonFullIndex].limit(firstNonFullLength);
        }
        for (int i2 = firstNonFullIndex + 1; i2 < dataBuffers.length; ++i2) {
            dataBuffers[i2].limit(0);
        }
    }

    private void writeParityCells() {
        this.blockOutputStreamEntryPool.getCurrentStreamEntry().forceToFirstParityBlock();
        ByteBuffer[] parityCells = this.ecChunkBufferCache.getParityBuffers();
        for (int i = 0; i < this.numParityBlks; ++i) {
            this.handleOutputStreamWrite(this.numDataBlks + i, parityCells[i].limit(), true);
            this.blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
        }
    }

    private int handleWrite(byte[] b, int off, int len) throws IOException {
        this.blockOutputStreamEntryPool.allocateBlockIfNeeded();
        int currIdx = this.blockOutputStreamEntryPool.getCurrentStreamEntry().getCurrentStreamIdx();
        int bufferRem = this.ecChunkBufferCache.dataBuffers[currIdx].remaining();
        int writeLen = Math.min(len, Math.min(bufferRem, this.ecChunkSize));
        int pos = this.ecChunkBufferCache.addToDataBuffer(currIdx, b, off, writeLen);
        if (pos == this.ecChunkSize) {
            this.handleOutputStreamWrite(currIdx, pos, false);
            this.blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
            if (currIdx == this.numDataBlks - 1) {
                this.encodeAndWriteParityCells();
            }
        }
        return writeLen;
    }

    private void handleOutputStreamWrite(int currIdx, int len, boolean isParity) {
        ByteBuffer bytesToWrite = isParity ? this.ecChunkBufferCache.getParityBuffers()[currIdx - this.numDataBlks] : this.ecChunkBufferCache.getDataBuffers()[currIdx];
        try {
            assert (len <= this.ecChunkSize) : " The len: " + len + ". EC chunk size: " + this.ecChunkSize;
            assert (len <= bytesToWrite.limit()) : " The len: " + len + ". Chunk buffer limit: " + bytesToWrite.limit();
            this.writeToOutputStream(this.blockOutputStreamEntryPool.getCurrentStreamEntry(), bytesToWrite.array(), len, 0, isParity);
        }
        catch (Exception e) {
            this.markStreamAsFailed(e);
        }
    }

    private long writeToOutputStream(ECBlockOutputStreamEntry current, byte[] b, int writeLen, int off, boolean isParity) throws IOException {
        try {
            if (!isParity) {
                this.offset += (long)writeLen;
            }
            current.write(b, off, writeLen);
        }
        catch (IOException ioe) {
            LOG.debug("Exception while writing the cell buffers. The writeLen: " + writeLen + ". The block internal index is: " + current.getCurrentStreamIdx(), (Throwable)ioe);
            this.handleException(current, ioe);
        }
        return writeLen;
    }

    private void handleException(BlockOutputStreamEntry streamEntry, IOException exception) throws IOException {
        Throwable t = HddsClientUtils.checkForException((Exception)exception);
        Preconditions.checkNotNull((Object)t);
        boolean containerExclusionException = this.checkIfContainerToExclude(t);
        if (containerExclusionException) {
            this.blockOutputStreamEntryPool.getExcludeList().addPipeline(streamEntry.getPipeline().getId());
        }
        this.markStreamAsFailed(exception);
    }

    private void markStreamClosed() {
        this.blockOutputStreamEntryPool.cleanup();
        this.closed = true;
    }

    private void markStreamAsFailed(Exception e) {
        this.blockOutputStreamEntryPool.getCurrentStreamEntry().markFailed(e);
    }

    @Override
    public void flush() {
        LOG.debug("ECKeyOutputStream does not support flush.");
    }

    private void closeCurrentStreamEntry() throws IOException {
        if (!this.blockOutputStreamEntryPool.isEmpty()) {
            try {
                ECBlockOutputStreamEntry entry;
                while ((entry = this.blockOutputStreamEntryPool.getCurrentStreamEntry()) != null) {
                    try {
                        ((BlockOutputStreamEntry)entry).close();
                        break;
                    }
                    catch (IOException ioe) {
                        this.handleException(entry, ioe);
                    }
                }
                return;
            }
            catch (Exception e) {
                this.markStreamClosed();
                throw e;
            }
        }
    }

    @Override
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.closed = true;
        try {
            if (this.ecChunkBufferCache.getFirstDataCell().position() > 0) {
                int index = this.blockOutputStreamEntryPool.getCurrentStreamEntry().getCurrentStreamIdx();
                ByteBuffer lastCell = this.ecChunkBufferCache.getDataBuffers()[index];
                if (lastCell.position() % this.ecChunkSize != 0) {
                    this.handleOutputStreamWrite(index, lastCell.position(), false);
                }
                this.encodeAndWriteParityCells();
            }
            this.closeCurrentStreamEntry();
            Preconditions.checkArgument((this.writeOffset == this.offset ? 1 : 0) != 0, (Object)("Expected writeOffset= " + this.writeOffset + " Expected offset=" + this.offset));
            this.blockOutputStreamEntryPool.commitKey(this.offset);
        }
        finally {
            this.blockOutputStreamEntryPool.cleanup();
        }
        this.ecChunkBufferCache.release();
    }

    private void retryStripeWrite(int times) throws IOException {
        for (int i = 0; i < times; ++i) {
            if (this.rewriteStripeToNewBlockGroup() != StripeWriteStatus.SUCCESS) continue;
            return;
        }
        throw new IOException("Completed max allowed retries " + times + " on stripe failures.");
    }

    public static void padBufferToLimit(ByteBuffer buf, int limit) {
        int pos = buf.position();
        if (pos >= limit) {
            return;
        }
        Arrays.fill(buf.array(), pos, limit, (byte)0);
        buf.position(limit);
    }

    @Override
    public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
        return this.blockOutputStreamEntryPool.getCommitUploadPartInfo();
    }

    @Override
    @VisibleForTesting
    public ExcludeList getExcludeList() {
        return this.blockOutputStreamEntryPool.getExcludeList();
    }

    private void checkNotClosed() throws IOException {
        if (this.closed) {
            throw new IOException(": Stream is closed! Key: " + this.blockOutputStreamEntryPool.getKeyName());
        }
    }

    private static class ECChunkBuffers {
        private final ByteBuffer[] dataBuffers;
        private final ByteBuffer[] parityBuffers;
        private int cellSize;
        private ByteBufferPool byteBufferPool;

        ECChunkBuffers(int cellSize, int numData, int numParity, ByteBufferPool byteBufferPool) {
            this.cellSize = cellSize;
            this.dataBuffers = new ByteBuffer[numData];
            this.parityBuffers = new ByteBuffer[numParity];
            this.byteBufferPool = byteBufferPool;
            this.allocateBuffers(this.dataBuffers, this.cellSize);
            this.allocateBuffers(this.parityBuffers, this.cellSize);
        }

        private ByteBuffer[] getDataBuffers() {
            return this.dataBuffers;
        }

        private ByteBuffer[] getParityBuffers() {
            return this.parityBuffers;
        }

        private ByteBuffer getFirstDataCell() {
            return this.dataBuffers[0];
        }

        private ByteBuffer getLastDataCell() {
            return this.dataBuffers[this.dataBuffers.length - 1];
        }

        private int addToDataBuffer(int i, byte[] b, int off, int len) {
            ByteBuffer buf = this.dataBuffers[i];
            int pos = buf.position() + len;
            Preconditions.checkState((pos <= this.cellSize ? 1 : 0) != 0, (Object)("Position(" + pos + ") is greater than the cellSize(" + this.cellSize + ")."));
            buf.put(b, off, len);
            return pos;
        }

        private void clear() {
            this.clearBuffers(this.dataBuffers);
            this.clearBuffers(this.parityBuffers);
        }

        private void release() {
            this.releaseBuffers(this.dataBuffers);
            this.releaseBuffers(this.parityBuffers);
        }

        private void allocateBuffers(ByteBuffer[] buffers, int bufferSize) {
            for (int i = 0; i < buffers.length; ++i) {
                buffers[i] = this.byteBufferPool.getBuffer(false, this.cellSize);
                buffers[i].limit(bufferSize);
            }
        }

        private void clearBuffers(ByteBuffer[] buffers) {
            for (int i = 0; i < buffers.length; ++i) {
                buffers[i].clear();
                buffers[i].limit(this.cellSize);
            }
        }

        private void releaseBuffers(ByteBuffer[] buffers) {
            for (int i = 0; i < buffers.length; ++i) {
                if (buffers[i] == null) continue;
                this.byteBufferPool.putBuffer(buffers[i]);
                buffers[i] = null;
            }
        }
    }

    public static class Builder
    extends KeyOutputStream.Builder {
        private ECReplicationConfig replicationConfig;
        private ByteBufferPool byteBufferPool;

        public ECReplicationConfig getReplicationConfig() {
            return this.replicationConfig;
        }

        public Builder setReplicationConfig(ECReplicationConfig replConfig) {
            this.replicationConfig = replConfig;
            return this;
        }

        public ByteBufferPool getByteBufferPool() {
            return this.byteBufferPool;
        }

        public Builder setByteBufferPool(ByteBufferPool bufferPool) {
            this.byteBufferPool = bufferPool;
            return this;
        }

        @Override
        public ECKeyOutputStream build() {
            return new ECKeyOutputStream(this);
        }
    }

    private static enum StripeWriteStatus {
        SUCCESS,
        FAILED;

    }
}

