/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.cosn;

import com.qcloud.cos.model.PartETag;
import java.io.IOException;
import java.io.OutputStream;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.cosn.BufferPool;
import org.apache.hadoop.fs.cosn.ByteBufferInputStream;
import org.apache.hadoop.fs.cosn.ByteBufferOutputStream;
import org.apache.hadoop.fs.cosn.ByteBufferWrapper;
import org.apache.hadoop.fs.cosn.NativeFileSystemStore;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CosNOutputStream
extends OutputStream {
    private static final Logger LOG = LoggerFactory.getLogger(CosNOutputStream.class);
    private final Configuration conf;
    private final NativeFileSystemStore store;
    private MessageDigest digest;
    private long blockSize;
    private String key;
    private int currentBlockId = 0;
    private Set<ByteBufferWrapper> blockCacheBuffers = new HashSet<ByteBufferWrapper>();
    private ByteBufferWrapper currentBlockBuffer;
    private OutputStream currentBlockOutputStream;
    private String uploadId = null;
    private ListeningExecutorService executorService;
    private List<ListenableFuture<PartETag>> etagList = new LinkedList<ListenableFuture<PartETag>>();
    private int blockWritten = 0;
    private boolean closed = false;

    public CosNOutputStream(Configuration conf, NativeFileSystemStore store, String key, long blockSize, ExecutorService executorService) throws IOException {
        this.conf = conf;
        this.store = store;
        this.key = key;
        this.blockSize = blockSize;
        if (this.blockSize < 0x100000L) {
            LOG.warn(String.format("The minimum size of a single block is limited to %d.", 0x100000L));
            this.blockSize = 0x100000L;
        }
        if (this.blockSize > 0x80000000L) {
            LOG.warn(String.format("The maximum size of a single block is limited to %d.", 0x80000000L));
            this.blockSize = 0x80000000L;
        }
        this.executorService = MoreExecutors.listeningDecorator((ExecutorService)executorService);
        try {
            this.currentBlockBuffer = BufferPool.getInstance().getBuffer((int)this.blockSize);
        }
        catch (IOException e) {
            throw new IOException("Getting a buffer size: " + String.valueOf(this.blockSize) + " from buffer pool occurs an exception: ", e);
        }
        try {
            this.digest = MessageDigest.getInstance("MD5");
            this.currentBlockOutputStream = new DigestOutputStream(new ByteBufferOutputStream(this.currentBlockBuffer.getByteBuffer()), this.digest);
        }
        catch (NoSuchAlgorithmException e) {
            this.digest = null;
            this.currentBlockOutputStream = new ByteBufferOutputStream(this.currentBlockBuffer.getByteBuffer());
        }
    }

    @Override
    public void flush() throws IOException {
        this.currentBlockOutputStream.flush();
    }

    @Override
    public synchronized void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.currentBlockOutputStream.flush();
        this.currentBlockOutputStream.close();
        LOG.info("The output stream has been close, and begin to upload the last block: [{}].", (Object)this.currentBlockId);
        this.blockCacheBuffers.add(this.currentBlockBuffer);
        if (this.blockCacheBuffers.size() == 1) {
            byte[] md5Hash = this.digest == null ? null : this.digest.digest();
            this.store.storeFile(this.key, new ByteBufferInputStream(this.currentBlockBuffer.getByteBuffer()), md5Hash, this.currentBlockBuffer.getByteBuffer().remaining());
        } else {
            List<PartETag> futurePartETagList;
            PartETag partETag = null;
            if (this.blockWritten > 0) {
                LOG.info("Upload the last part..., blockId: [{}], written bytes: [{}]", (Object)this.currentBlockId, (Object)this.blockWritten);
                partETag = this.store.uploadPart(new ByteBufferInputStream(this.currentBlockBuffer.getByteBuffer()), this.key, this.uploadId, this.currentBlockId + 1, this.currentBlockBuffer.getByteBuffer().remaining());
            }
            if (null == (futurePartETagList = this.waitForFinishPartUploads())) {
                throw new IOException("Failed to multipart upload to cos, abort it.");
            }
            LinkedList<PartETag> tmpPartEtagList = new LinkedList<PartETag>(futurePartETagList);
            if (null != partETag) {
                tmpPartEtagList.add(partETag);
            }
            this.store.completeMultipartUpload(this.key, this.uploadId, tmpPartEtagList);
        }
        try {
            BufferPool.getInstance().returnBuffer(this.currentBlockBuffer);
        }
        catch (InterruptedException e) {
            LOG.error("An exception occurred while returning the buffer to the buffer pool.", (Throwable)e);
        }
        LOG.info("The outputStream for key: [{}] has been uploaded.", (Object)this.key);
        this.blockWritten = 0;
        this.closed = true;
    }

    private List<PartETag> waitForFinishPartUploads() throws IOException {
        try {
            LOG.info("Wait for all parts to finish their uploading.");
            return (List)Futures.allAsList(this.etagList).get();
        }
        catch (InterruptedException e) {
            LOG.error("Interrupt the part upload.", (Throwable)e);
            return null;
        }
        catch (ExecutionException e) {
            LOG.error("Cancelling futures.");
            for (ListenableFuture<PartETag> future : this.etagList) {
                future.cancel(true);
            }
            this.store.abortMultipartUpload(this.key, this.uploadId);
            LOG.error("Multipart upload with id: [{}] to COS key: [{}]", new Object[]{this.uploadId, this.key, e});
            throw new IOException("Multipart upload with id: " + this.uploadId + " to " + this.key, e);
        }
    }

    private void uploadPart() throws IOException {
        this.currentBlockOutputStream.flush();
        this.currentBlockOutputStream.close();
        this.blockCacheBuffers.add(this.currentBlockBuffer);
        if (this.currentBlockId == 0) {
            this.uploadId = this.store.getUploadId(this.key);
        }
        ListenableFuture partETagListenableFuture = this.executorService.submit((Callable)new Callable<PartETag>(){
            private final ByteBufferWrapper buf;
            private final String localKey;
            private final String localUploadId;
            private final int blockId;
            {
                this.buf = CosNOutputStream.this.currentBlockBuffer;
                this.localKey = CosNOutputStream.this.key;
                this.localUploadId = CosNOutputStream.this.uploadId;
                this.blockId = CosNOutputStream.this.currentBlockId;
            }

            @Override
            public PartETag call() throws Exception {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} is uploading a part.", (Object)Thread.currentThread().getName());
                }
                PartETag partETag = CosNOutputStream.this.store.uploadPart(new ByteBufferInputStream(this.buf.getByteBuffer()), this.localKey, this.localUploadId, this.blockId + 1, this.buf.getByteBuffer().remaining());
                BufferPool.getInstance().returnBuffer(this.buf);
                return partETag;
            }
        });
        this.etagList.add((ListenableFuture<PartETag>)partETagListenableFuture);
        try {
            this.currentBlockBuffer = BufferPool.getInstance().getBuffer((int)this.blockSize);
        }
        catch (IOException e) {
            String errMsg = String.format("Getting a buffer [size:%d] from the buffer pool failed.", this.blockSize);
            throw new IOException(errMsg, e);
        }
        ++this.currentBlockId;
        if (null != this.digest) {
            this.digest.reset();
            this.currentBlockOutputStream = new DigestOutputStream(new ByteBufferOutputStream(this.currentBlockBuffer.getByteBuffer()), this.digest);
        } else {
            this.currentBlockOutputStream = new ByteBufferOutputStream(this.currentBlockBuffer.getByteBuffer());
        }
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        if (this.closed) {
            throw new IOException("block stream has been closed.");
        }
        while (len > 0) {
            long writeBytes = (long)(this.blockWritten + len) > this.blockSize ? this.blockSize - (long)this.blockWritten : (long)len;
            this.currentBlockOutputStream.write(b, off, (int)writeBytes);
            this.blockWritten = (int)((long)this.blockWritten + writeBytes);
            if ((long)this.blockWritten >= this.blockSize) {
                this.uploadPart();
                this.blockWritten = 0;
            }
            len = (int)((long)len - writeBytes);
            off = (int)((long)off + writeBytes);
        }
    }

    @Override
    public void write(byte[] b) throws IOException {
        this.write(b, 0, b.length);
    }

    @Override
    public void write(int b) throws IOException {
        if (this.closed) {
            throw new IOException("block stream has been closed.");
        }
        byte[] singleBytes = new byte[]{(byte)b};
        this.currentBlockOutputStream.write(singleBytes, 0, 1);
        ++this.blockWritten;
        if ((long)this.blockWritten >= this.blockSize) {
            this.uploadPart();
            this.blockWritten = 0;
        }
    }
}

