/*
 * Decompiled with CFR 0.152.
 */
package dev.miku.r2dbc.mysql.client;

import dev.miku.r2dbc.mysql.message.header.SequenceIdProvider;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.util.annotation.Nullable;

final class WriteSubscriber
implements CoreSubscriber<ByteBuf> {
    private final ChannelHandlerContext ctx;
    private final ChannelPromise promise;
    private final SequenceIdProvider provider;

    private WriteSubscriber(ChannelHandlerContext ctx, ChannelPromise promise, SequenceIdProvider provider) {
        this.ctx = ctx;
        this.promise = promise;
        this.provider = provider;
    }

    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE);
    }

    public void onNext(ByteBuf buf) {
        this.ctx.write((Object)this.ctx.alloc().buffer(4, 4).writeMediumLE(buf.readableBytes()).writeByte((int)this.provider.next()));
        this.ctx.write((Object)buf);
    }

    public void onError(Throwable cause) {
        try {
            this.ctx.fireExceptionCaught(cause);
        }
        finally {
            this.promise.setSuccess();
        }
    }

    public void onComplete() {
        this.promise.setSuccess();
    }

    static WriteSubscriber create(ChannelHandlerContext ctx, ChannelPromise promise, @Nullable SequenceIdProvider provider) {
        if (provider == null) {
            provider = SequenceIdProvider.unsafe();
        }
        return new WriteSubscriber(ctx, promise, provider);
    }
}

