/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.qpid.protonj2.client.impl;

import com.rabbitmq.qpid.protonj2.buffer.ProtonBuffer;
import com.rabbitmq.qpid.protonj2.buffer.ProtonBufferAllocator;
import com.rabbitmq.qpid.protonj2.buffer.ProtonCompositeBuffer;
import com.rabbitmq.qpid.protonj2.client.Message;
import com.rabbitmq.qpid.protonj2.client.OutputStreamOptions;
import com.rabbitmq.qpid.protonj2.client.StreamSenderMessage;
import com.rabbitmq.qpid.protonj2.client.StreamTracker;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
import com.rabbitmq.qpid.protonj2.client.impl.ClientMessageSupport;
import com.rabbitmq.qpid.protonj2.client.impl.ClientStreamSender;
import com.rabbitmq.qpid.protonj2.client.impl.ClientStreamTracker;
import com.rabbitmq.qpid.protonj2.engine.OutgoingDelivery;
import com.rabbitmq.qpid.protonj2.types.Binary;
import com.rabbitmq.qpid.protonj2.types.Symbol;
import com.rabbitmq.qpid.protonj2.types.messaging.ApplicationProperties;
import com.rabbitmq.qpid.protonj2.types.messaging.Data;
import com.rabbitmq.qpid.protonj2.types.messaging.DeliveryAnnotations;
import com.rabbitmq.qpid.protonj2.types.messaging.Footer;
import com.rabbitmq.qpid.protonj2.types.messaging.Header;
import com.rabbitmq.qpid.protonj2.types.messaging.MessageAnnotations;
import com.rabbitmq.qpid.protonj2.types.messaging.Properties;
import com.rabbitmq.qpid.protonj2.types.messaging.Section;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

final class ClientStreamSenderMessage
implements StreamSenderMessage {
    private static final int DATA_SECTION_HEADER_ENCODING_SIZE = 8;
    private static final byte[] DATA_SECTION_PREAMBLE = new byte[]{0, 83, Data.DESCRIPTOR_CODE.byteValue(), -80};
    private final ClientStreamSender sender;
    private final DeliveryAnnotations deliveryAnnotations;
    private final int writeBufferSize;
    private final ClientStreamTracker tracker;
    private Header header;
    private MessageAnnotations annotations;
    private Properties properties;
    private ApplicationProperties applicationProperties;
    private Footer footer;
    private ProtonBuffer buffer;
    private volatile int messageFormat;
    private StreamState currentState = StreamState.PREAMBLE;

    ClientStreamSenderMessage(ClientStreamSender sender, ClientStreamTracker tracker, DeliveryAnnotations deliveryAnnotations) {
        this.sender = sender;
        this.deliveryAnnotations = deliveryAnnotations;
        this.tracker = tracker;
        this.writeBufferSize = sender.options().writeBufferSize() > 0 ? Math.max(256, sender.options().writeBufferSize()) : Math.max(256, (int)sender.protonLink().getConnection().getMaxFrameSize());
    }

    OutgoingDelivery getProtonDelivery() {
        return this.tracker.delivery();
    }

    @Override
    public ClientStreamSender sender() {
        return this.sender;
    }

    @Override
    public StreamTracker tracker() {
        return this.completed() ? this.tracker : null;
    }

    @Override
    public int messageFormat() throws ClientException {
        return this.messageFormat;
    }

    @Override
    public ClientStreamSenderMessage messageFormat(int messageFormat) throws ClientException {
        if (this.currentState != StreamState.PREAMBLE) {
            throw new ClientIllegalStateException("Cannot set message format after body writes have started.");
        }
        this.messageFormat = messageFormat;
        return this;
    }

    private void doFlush() throws ClientException {
        if (this.buffer != null && this.buffer.isReadable()) {
            try {
                this.sender.sendMessage(this, (ProtonBuffer)this.buffer.convertToReadOnly().transfer(), this.messageFormat);
            }
            finally {
                this.buffer.close();
                this.buffer = null;
            }
        }
    }

    @Override
    public ClientStreamSenderMessage abort() throws ClientException {
        if (this.completed()) {
            throw new ClientIllegalStateException("Cannot abort an already completed send context");
        }
        if (!this.aborted()) {
            try {
                this.currentState = StreamState.ABORTED;
                this.sender.abort(this.getProtonDelivery(), this.tracker);
            }
            finally {
                if (this.buffer != null) {
                    this.buffer.close();
                }
                this.buffer = null;
            }
        }
        return this;
    }

    @Override
    public boolean aborted() {
        return this.currentState == StreamState.ABORTED;
    }

    @Override
    public ClientStreamSenderMessage complete() throws ClientException {
        if (this.aborted()) {
            throw new ClientIllegalStateException("Cannot complete an already aborted send context");
        }
        try {
            if (!this.completed()) {
                if (this.footer != null) {
                    this.write(this.footer);
                }
                this.currentState = StreamState.COMPLETE;
                if (this.buffer != null && this.buffer.isReadable()) {
                    this.doFlush();
                } else {
                    this.sender.complete(this.getProtonDelivery(), this.tracker);
                }
            }
        }
        finally {
            if (this.buffer != null) {
                this.buffer.close();
            }
            this.buffer = null;
        }
        return this;
    }

    @Override
    public boolean completed() {
        return this.currentState == StreamState.COMPLETE;
    }

    @Override
    public Message<OutputStream> body(OutputStream value) throws ClientUnsupportedOperationException {
        throw new ClientUnsupportedOperationException("Cannot set an OutputStream body on a StreamSenderMessage");
    }

    public StreamSenderMessage addBodySection(Section<?> bodySection) throws ClientException {
        if (this.completed()) {
            throw new ClientIllegalStateException("Cannot add more body sections to a completed message");
        }
        if (this.aborted()) {
            throw new ClientIllegalStateException("Cannot add more body sections to an aborted message");
        }
        if (this.currentState == StreamState.BODY_WRITTING) {
            throw new ClientIllegalStateException("Cannot add more body sections while an OutputStream is active");
        }
        this.transitionToWritableState();
        this.appendDataToBuffer(ClientMessageSupport.encodeSection(bodySection, ProtonBufferAllocator.defaultAllocator().allocate()));
        return this;
    }

    public StreamSenderMessage bodySections(Collection<Section<?>> sections) throws ClientException {
        Objects.requireNonNull(sections, "Cannot set body sections with a null Collection");
        for (Section<?> section : sections) {
            this.addBodySection((Section)section);
        }
        return this;
    }

    @Override
    public Collection<Section<?>> bodySections() throws ClientException {
        return Collections.EMPTY_LIST;
    }

    public StreamSenderMessage forEachBodySection(Consumer<Section<?>> consumer) throws ClientException {
        return this;
    }

    public StreamSenderMessage clearBodySections() throws ClientException {
        return this;
    }

    @Override
    public OutputStream body() throws ClientException {
        return this.body(new OutputStreamOptions());
    }

    @Override
    public OutputStream body(OutputStreamOptions options) throws ClientException {
        if (this.completed()) {
            throw new ClientIllegalStateException("Cannot create an OutputStream from a completed send context");
        }
        if (this.aborted()) {
            throw new ClientIllegalStateException("Cannot create an OutputStream from a aborted send context");
        }
        if (this.currentState == StreamState.BODY_WRITTING) {
            throw new ClientIllegalStateException("Cannot add more body sections while an OutputStream is active");
        }
        this.transitionToWritableState();
        ProtonBuffer streamBuffer = ProtonBufferAllocator.defaultAllocator().allocate(this.writeBufferSize).implicitGrowthLimit(this.writeBufferSize);
        if (options.bodyLength() > 0) {
            return new SingularDataSectionOutputStream(options, streamBuffer);
        }
        return new MultipleDataSectionsOutputStream(options, streamBuffer);
    }

    @Override
    public OutputStream rawOutputStream() throws ClientException {
        if (this.completed()) {
            throw new ClientIllegalStateException("Cannot create an OutputStream from a completed send context");
        }
        if (this.aborted()) {
            throw new ClientIllegalStateException("Cannot create an OutputStream from a aborted send context");
        }
        if (this.currentState == StreamState.BODY_WRITTING) {
            throw new ClientIllegalStateException("Cannot add more body sections while an OutputStream is active");
        }
        this.transitionToWritableState();
        return new SendContextRawBytesOutputStream(ProtonBufferAllocator.defaultAllocator().allocate(this.writeBufferSize).implicitGrowthLimit(this.writeBufferSize));
    }

    @Override
    public boolean durable() {
        return this.header == null ? false : this.header.isDurable();
    }

    public StreamSenderMessage durable(boolean durable) throws ClientIllegalStateException {
        this.lazyCreateHeader().setDurable(durable);
        return this;
    }

    @Override
    public byte priority() {
        return this.header == null ? (byte)4 : this.header.getPriority();
    }

    public StreamSenderMessage priority(byte priority) throws ClientIllegalStateException {
        this.lazyCreateHeader().setPriority(priority);
        return this;
    }

    @Override
    public long timeToLive() {
        return this.header == null ? Header.DEFAULT_TIME_TO_LIVE : this.header.getTimeToLive();
    }

    public StreamSenderMessage timeToLive(long timeToLive) throws ClientIllegalStateException {
        this.lazyCreateHeader().setTimeToLive(timeToLive);
        return this;
    }

    @Override
    public boolean firstAcquirer() {
        return this.header == null ? false : this.header.isFirstAcquirer();
    }

    public StreamSenderMessage firstAcquirer(boolean firstAcquirer) throws ClientIllegalStateException {
        this.lazyCreateHeader().setFirstAcquirer(firstAcquirer);
        return this;
    }

    @Override
    public long deliveryCount() {
        return this.header == null ? 0L : this.header.getDeliveryCount();
    }

    public StreamSenderMessage deliveryCount(long deliveryCount) throws ClientIllegalStateException {
        this.lazyCreateHeader().setDeliveryCount(deliveryCount);
        return this;
    }

    @Override
    public Object messageId() {
        return this.properties != null ? this.properties.getMessageId() : null;
    }

    public StreamSenderMessage messageId(Object messageId) throws ClientIllegalStateException {
        this.lazyCreateProperties().setMessageId(messageId);
        return this;
    }

    @Override
    public byte[] userId() {
        if (this.properties != null && this.properties.getUserId() != null) {
            return this.properties.getUserId().asByteArray();
        }
        return null;
    }

    public StreamSenderMessage userId(byte[] userId) throws ClientIllegalStateException {
        this.lazyCreateProperties().setUserId(new Binary(Arrays.copyOf(userId, userId.length)));
        return this;
    }

    @Override
    public String to() {
        return this.properties != null ? this.properties.getTo() : null;
    }

    public StreamSenderMessage to(String to) throws ClientIllegalStateException {
        this.lazyCreateProperties().setTo(to);
        return this;
    }

    @Override
    public String subject() {
        return this.properties != null ? this.properties.getSubject() : null;
    }

    public StreamSenderMessage subject(String subject) throws ClientIllegalStateException {
        this.lazyCreateProperties().setSubject(subject);
        return this;
    }

    @Override
    public String replyTo() {
        return this.properties != null ? this.properties.getReplyTo() : null;
    }

    public StreamSenderMessage replyTo(String replyTo) throws ClientIllegalStateException {
        this.lazyCreateProperties().setReplyTo(replyTo);
        return this;
    }

    @Override
    public Object correlationId() {
        return this.properties != null ? this.properties.getCorrelationId() : null;
    }

    public StreamSenderMessage correlationId(Object correlationId) throws ClientIllegalStateException {
        this.lazyCreateProperties().setCorrelationId(correlationId);
        return this;
    }

    @Override
    public String contentType() {
        return this.properties != null ? this.properties.getContentType() : null;
    }

    public StreamSenderMessage contentType(String contentType) throws ClientIllegalStateException {
        this.lazyCreateProperties().setContentType(contentType);
        return this;
    }

    @Override
    public String contentEncoding() {
        return this.properties != null ? this.properties.getContentEncoding() : null;
    }

    public StreamSenderMessage contentEncoding(String contentEncoding) throws ClientIllegalStateException {
        this.lazyCreateProperties().setContentEncoding(contentEncoding);
        return this;
    }

    @Override
    public long absoluteExpiryTime() {
        return this.properties != null ? this.properties.getAbsoluteExpiryTime() : 0L;
    }

    public StreamSenderMessage absoluteExpiryTime(long expiryTime) throws ClientIllegalStateException {
        this.lazyCreateProperties().setAbsoluteExpiryTime(expiryTime);
        return this;
    }

    @Override
    public long creationTime() {
        return this.properties != null ? this.properties.getCreationTime() : 0L;
    }

    public StreamSenderMessage creationTime(long createTime) throws ClientIllegalStateException {
        this.lazyCreateProperties().setCreationTime(createTime);
        return this;
    }

    @Override
    public String groupId() {
        return this.properties != null ? this.properties.getGroupId() : null;
    }

    public StreamSenderMessage groupId(String groupId) throws ClientIllegalStateException {
        this.lazyCreateProperties().setGroupId(groupId);
        return this;
    }

    @Override
    public int groupSequence() {
        return this.properties != null ? (int)this.properties.getGroupSequence() : 0;
    }

    public StreamSenderMessage groupSequence(int groupSequence) throws ClientIllegalStateException {
        this.lazyCreateProperties().setGroupSequence(groupSequence);
        return this;
    }

    @Override
    public String replyToGroupId() {
        return this.properties != null ? this.properties.getReplyToGroupId() : null;
    }

    public StreamSenderMessage replyToGroupId(String replyToGroupId) throws ClientIllegalStateException {
        this.lazyCreateProperties().setReplyToGroupId(replyToGroupId);
        return this;
    }

    @Override
    public Object annotation(String key) {
        Object value = null;
        if (this.annotations != null) {
            value = this.annotations.getValue().get(Symbol.valueOf(key));
        }
        return value;
    }

    @Override
    public boolean hasAnnotation(String key) {
        if (this.annotations != null && this.annotations.getValue() != null) {
            return this.annotations.getValue().containsKey(Symbol.valueOf(key));
        }
        return false;
    }

    @Override
    public boolean hasAnnotations() {
        return this.annotations != null && this.annotations.getValue() != null && this.annotations.getValue().size() > 0;
    }

    @Override
    public Object removeAnnotation(String key) {
        if (this.hasAnnotations()) {
            return this.annotations.getValue().remove(Symbol.valueOf(key));
        }
        return null;
    }

    public StreamSenderMessage forEachAnnotation(BiConsumer<String, Object> action) {
        if (this.hasAnnotations()) {
            this.annotations.getValue().forEach((key, value) -> action.accept(key.toString(), value));
        }
        return this;
    }

    public ClientStreamSenderMessage annotation(String key, Object value) throws ClientIllegalStateException {
        this.lazyCreateMessageAnnotations().getValue().put(Symbol.valueOf(key), value);
        return this;
    }

    @Override
    public Object property(String key) {
        Object value = null;
        if (this.hasProperties()) {
            value = this.applicationProperties.getValue().get(key);
        }
        return value;
    }

    @Override
    public boolean hasProperty(String key) {
        if (this.hasProperties()) {
            return this.applicationProperties.getValue().containsKey(key);
        }
        return false;
    }

    @Override
    public boolean hasProperties() {
        return this.applicationProperties != null && this.applicationProperties.getValue() != null && this.applicationProperties.getValue().size() > 0;
    }

    @Override
    public Object removeProperty(String key) {
        if (this.hasProperties()) {
            return this.applicationProperties.getValue().remove(key);
        }
        return null;
    }

    public StreamSenderMessage forEachProperty(BiConsumer<String, Object> action) {
        if (this.hasProperties()) {
            this.applicationProperties.getValue().forEach(action);
        }
        return this;
    }

    public ClientStreamSenderMessage property(String key, Object value) throws ClientIllegalStateException {
        this.lazyCreateApplicationProperties().getValue().put(key, value);
        return this;
    }

    @Override
    public Object footer(String key) {
        Object value = null;
        if (this.hasFooters()) {
            value = this.footer.getValue().get(Symbol.valueOf(key));
        }
        return value;
    }

    @Override
    public boolean hasFooter(String key) {
        if (this.hasFooters()) {
            return this.footer.getValue().containsKey(Symbol.valueOf(key));
        }
        return false;
    }

    @Override
    public boolean hasFooters() {
        return this.footer != null && this.footer.getValue() != null && this.footer.getValue().size() > 0;
    }

    @Override
    public Object removeFooter(String key) {
        if (this.hasFooters()) {
            return this.footer.getValue().remove(Symbol.valueOf(key));
        }
        return null;
    }

    public StreamSenderMessage forEachFooter(BiConsumer<String, Object> action) {
        if (this.hasFooters()) {
            this.footer.getValue().forEach((key, value) -> action.accept(key.toString(), value));
        }
        return this;
    }

    public ClientStreamSenderMessage footer(String key, Object value) throws ClientIllegalStateException {
        this.lazyCreateFooter().getValue().put(Symbol.valueOf(key), value);
        return this;
    }

    @Override
    public Header header() throws ClientException {
        return this.header;
    }

    public StreamSenderMessage header(Header header) throws ClientException {
        this.checkStreamState(StreamState.PREAMBLE, "Cannot write to Message Header after body writing has started.");
        this.header = header;
        return this;
    }

    @Override
    public MessageAnnotations annotations() throws ClientException {
        return this.annotations;
    }

    public StreamSenderMessage annotations(MessageAnnotations annotations) throws ClientException {
        this.checkStreamState(StreamState.PREAMBLE, "Cannot write to Message Annotations after body writing has started.");
        this.annotations = annotations;
        return this;
    }

    @Override
    public Properties properties() throws ClientException {
        return this.properties;
    }

    public StreamSenderMessage properties(Properties properties) throws ClientException {
        this.checkStreamState(StreamState.PREAMBLE, "Cannot write to Message Properties after body writing has started.");
        this.properties = properties;
        return this;
    }

    @Override
    public ApplicationProperties applicationProperties() throws ClientException {
        return this.applicationProperties;
    }

    public StreamSenderMessage applicationProperties(ApplicationProperties applicationProperties) throws ClientException {
        this.checkStreamState(StreamState.PREAMBLE, "Cannot write to Message Application Properties after body writing has started.");
        this.applicationProperties = applicationProperties;
        return this;
    }

    @Override
    public Footer footer() throws ClientException {
        return this.footer;
    }

    public StreamSenderMessage footer(Footer footer) throws ClientException {
        if (this.currentState.ordinal() >= StreamState.COMPLETE.ordinal()) {
            throw new ClientIllegalStateException("Cannot write to Message Footer after message has been marked completed or aborted.");
        }
        this.footer = footer;
        return this;
    }

    @Override
    public ProtonBuffer encode(Map<String, Object> deliveryAnnotations, ProtonBufferAllocator allocator) throws ClientException {
        throw new ClientUnsupportedOperationException("StreamSenderMessage cannot be directly encoded");
    }

    private void appendDataToBuffer(ProtonBuffer incoming) throws ClientException {
        incoming = Objects.requireNonNull(incoming, "Appended buffer cannot be null").convertToReadOnly();
        if (this.buffer == null) {
            this.buffer = (ProtonBuffer)incoming.transfer();
        } else if (this.buffer instanceof ProtonCompositeBuffer) {
            ((ProtonCompositeBuffer)this.buffer).append(incoming);
        } else {
            this.buffer = ProtonBufferAllocator.defaultAllocator().composite(new ProtonBuffer[]{this.buffer, incoming});
        }
        if (this.buffer.getReadableBytes() >= this.writeBufferSize) {
            try {
                this.sender.sendMessage(this, (ProtonBuffer)this.buffer.convertToReadOnly().transfer(), this.messageFormat);
            }
            finally {
                this.buffer.close();
                this.buffer = null;
            }
        }
    }

    private void transitionToWritableState() throws ClientException {
        if (this.currentState == StreamState.PREAMBLE) {
            if (this.header != null) {
                this.appendDataToBuffer(ClientMessageSupport.encodeSection(this.header, ProtonBufferAllocator.defaultAllocator().allocate()));
            }
            if (this.deliveryAnnotations != null) {
                this.appendDataToBuffer(ClientMessageSupport.encodeSection(this.deliveryAnnotations, ProtonBufferAllocator.defaultAllocator().allocate()));
            }
            if (this.annotations != null) {
                this.appendDataToBuffer(ClientMessageSupport.encodeSection(this.annotations, ProtonBufferAllocator.defaultAllocator().allocate()));
            }
            if (this.properties != null) {
                this.appendDataToBuffer(ClientMessageSupport.encodeSection(this.properties, ProtonBufferAllocator.defaultAllocator().allocate()));
            }
            if (this.applicationProperties != null) {
                this.appendDataToBuffer(ClientMessageSupport.encodeSection(this.applicationProperties, ProtonBufferAllocator.defaultAllocator().allocate()));
            }
            this.currentState = StreamState.BODY_WRITABLE;
        }
    }

    private ClientStreamSenderMessage write(Section<?> section) throws ClientException {
        if (this.aborted()) {
            throw new ClientIllegalStateException("Cannot write a Section to an already aborted send context");
        }
        if (this.completed()) {
            throw new ClientIllegalStateException("Cannot write a Section to an already completed send context");
        }
        this.appendDataToBuffer(ClientMessageSupport.encodeSection(section, ProtonBufferAllocator.defaultAllocator().allocate()));
        return this;
    }

    private void checkStreamState(StreamState state, String errorMessage) throws ClientIllegalStateException {
        if (this.currentState != state) {
            throw new ClientIllegalStateException(errorMessage);
        }
    }

    private Header lazyCreateHeader() throws ClientIllegalStateException {
        this.checkStreamState(StreamState.PREAMBLE, "Cannot write to Message Header after body writing has started.");
        if (this.header == null) {
            this.header = new Header();
        }
        return this.header;
    }

    private Properties lazyCreateProperties() throws ClientIllegalStateException {
        this.checkStreamState(StreamState.PREAMBLE, "Cannot write to Message Properties after body writing has started.");
        if (this.properties == null) {
            this.properties = new Properties();
        }
        return this.properties;
    }

    private ApplicationProperties lazyCreateApplicationProperties() throws ClientIllegalStateException {
        this.checkStreamState(StreamState.PREAMBLE, "Cannot write to Message Application Properties after body writing has started.");
        if (this.applicationProperties == null) {
            this.applicationProperties = new ApplicationProperties(new LinkedHashMap());
        }
        return this.applicationProperties;
    }

    private MessageAnnotations lazyCreateMessageAnnotations() throws ClientIllegalStateException {
        this.checkStreamState(StreamState.PREAMBLE, "Cannot write to Message Annotations after body writing has started.");
        if (this.annotations == null) {
            this.annotations = new MessageAnnotations(new LinkedHashMap());
        }
        return this.annotations;
    }

    private Footer lazyCreateFooter() throws ClientIllegalStateException {
        if (this.currentState.ordinal() >= StreamState.COMPLETE.ordinal()) {
            throw new ClientIllegalStateException("Cannot write to Message Footer after message has been marked completed or aborted.");
        }
        if (this.footer == null) {
            this.footer = new Footer(new LinkedHashMap());
        }
        return this.footer;
    }

    private final class MultipleDataSectionsOutputStream
    extends StreamMessageOutputStream {
        public MultipleDataSectionsOutputStream(OutputStreamOptions options, ProtonBuffer buffer) {
            super(options, buffer);
        }

        @Override
        protected void doFlushPending(boolean complete) throws IOException {
            if (this.streamBuffer.isReadable()) {
                ProtonBuffer preamble = ProtonBufferAllocator.defaultAllocator().allocate(8).implicitGrowthLimit(8);
                preamble.writeBytes(DATA_SECTION_PREAMBLE);
                preamble.writeInt(this.streamBuffer.getReadableBytes());
                try (ProtonBuffer protonBuffer = preamble;){
                    ClientStreamSenderMessage.this.appendDataToBuffer(preamble);
                }
                catch (ClientException e) {
                    throw new IOException(e);
                }
            }
            super.doFlushPending(complete);
        }
    }

    private final class SingularDataSectionOutputStream
    extends StreamMessageOutputStream {
        public SingularDataSectionOutputStream(OutputStreamOptions options, ProtonBuffer buffer) throws ClientException {
            super(options, buffer);
            ProtonBuffer preamble = ProtonBufferAllocator.defaultAllocator().allocate(8).implicitGrowthLimit(8);
            preamble.writeBytes(DATA_SECTION_PREAMBLE);
            preamble.writeInt(options.bodyLength());
            try (ProtonBuffer protonBuffer = preamble;){
                ClientStreamSenderMessage.this.appendDataToBuffer(preamble);
            }
        }
    }

    private final class SendContextRawBytesOutputStream
    extends StreamMessageOutputStream {
        public SendContextRawBytesOutputStream(ProtonBuffer buffer) {
            super(new OutputStreamOptions(), buffer);
        }
    }

    private abstract class StreamMessageOutputStream
    extends OutputStream {
        protected final AtomicBoolean closed = new AtomicBoolean();
        protected final OutputStreamOptions options;
        protected final ProtonBuffer streamBuffer;
        protected int bytesWritten;

        public StreamMessageOutputStream(OutputStreamOptions options, ProtonBuffer buffer) {
            this.options = options;
            this.streamBuffer = buffer;
            ClientStreamSenderMessage.this.currentState = StreamState.BODY_WRITTING;
        }

        @Override
        public void write(int value) throws IOException {
            this.checkClosed();
            this.checkOutputLimitReached(1);
            this.streamBuffer.writeByte((byte)value);
            if (!this.streamBuffer.isWritable()) {
                this.flush();
            }
            ++this.bytesWritten;
        }

        @Override
        public void write(byte[] bytes) throws IOException {
            this.write(bytes, 0, bytes.length);
        }

        @Override
        public void write(byte[] bytes, int offset, int length) throws IOException {
            this.checkClosed();
            this.checkOutputLimitReached(length);
            if (this.streamBuffer.getWritableBytes() >= length) {
                this.streamBuffer.writeBytes(bytes, offset, length);
                this.bytesWritten += length;
                if (!this.streamBuffer.isWritable()) {
                    this.flush();
                }
            } else {
                int toWrite;
                for (int remaining = length; remaining > 0; remaining -= toWrite) {
                    toWrite = Math.min(remaining, this.streamBuffer.getWritableBytes());
                    this.bytesWritten += toWrite;
                    this.streamBuffer.writeBytes(bytes, offset + (length - remaining), toWrite);
                    if (this.streamBuffer.isWritable()) continue;
                    this.flush();
                }
            }
        }

        @Override
        public void flush() throws IOException {
            this.checkClosed();
            if (this.options.bodyLength() <= 0) {
                this.doFlushPending(false);
            } else {
                this.doFlushPending(this.bytesWritten == this.options.bodyLength() && this.options.completeSendOnClose());
            }
        }

        @Override
        public void close() throws IOException {
            block10: {
                if (this.closed.compareAndSet(false, true) && !ClientStreamSenderMessage.this.completed()) {
                    ClientStreamSenderMessage.this.currentState = StreamState.BODY_WRITABLE;
                    try (ProtonBuffer protonBuffer = this.streamBuffer;){
                        if (this.options.bodyLength() > 0 && this.options.bodyLength() != this.bytesWritten) {
                            try {
                                ClientStreamSenderMessage.this.abort();
                                break block10;
                            }
                            catch (ClientException e) {
                                throw new IOException(e);
                            }
                        }
                        this.doFlushPending(this.options.completeSendOnClose());
                    }
                }
            }
        }

        private void checkOutputLimitReached(int writeSize) throws IOException {
            int outputLimit = this.options.bodyLength();
            if (ClientStreamSenderMessage.this.completed()) {
                throw new IOException("Cannot write to an already completed message output stream");
            }
            if (outputLimit > 0 && this.bytesWritten + writeSize > outputLimit) {
                throw new IOException("Cannot write beyond configured stream output limit");
            }
        }

        private void checkClosed() throws IOException {
            if (this.closed.get()) {
                throw new IOException("The OutputStream has already been closed.");
            }
            if (ClientStreamSenderMessage.this.sender.isClosed()) {
                throw new IOException("The parent Sender instance has already been closed.");
            }
        }

        protected void doFlushPending(boolean complete) throws IOException {
            try {
                if (this.streamBuffer.isReadable()) {
                    ClientStreamSenderMessage.this.appendDataToBuffer(this.streamBuffer.split());
                }
                if (complete) {
                    ClientStreamSenderMessage.this.complete();
                } else {
                    ClientStreamSenderMessage.this.doFlush();
                }
                if (!complete) {
                    this.streamBuffer.ensureWritable(ClientStreamSenderMessage.this.writeBufferSize, ClientStreamSenderMessage.this.writeBufferSize, true);
                } else {
                    this.streamBuffer.close();
                }
            }
            catch (ClientException e) {
                throw new IOException(e);
            }
        }
    }

    private static enum StreamState {
        PREAMBLE,
        BODY_WRITABLE,
        BODY_WRITTING,
        COMPLETE,
        ABORTED;

    }
}

