/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.hotrod.test;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.infinispan.commons.util.Either;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.server.core.AbstractProtocolServer;
import org.infinispan.server.core.test.ServerTestingUtil;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.server.hotrod.transport.ExtendedByteBuf;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.test.fwk.TestResourceTracker;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="server.hotrod.test.HotRodPipeTest")
public class HotRodPipeTest
extends SingleCacheManagerTest {
    HotRodServer server;

    protected EmbeddedCacheManager createCacheManager() throws Exception {
        GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
        TestCacheManagerFactory.amendTransport((GlobalConfigurationBuilder)gcb);
        return TestCacheManagerFactory.createServerModeCacheManager((GlobalConfigurationBuilder)gcb);
    }

    protected void setup() throws Exception {
        super.setup();
        this.server = HotRodTestingUtil.startHotRodServer(this.cacheManager);
    }

    protected void teardown() {
        this.log.debug((Object)"Killing Hot Rod server");
        ServerTestingUtil.killServer((AbstractProtocolServer)this.server);
    }

    public void testPipeRequests() throws InterruptedException {
        int numPipeReqs = 10000;
        BatchingClient client = new BatchingClient(this.server.getPort());
        try {
            client.start();
            client.writeN(10000);
            this.eventuallyEquals(10000, () -> {
                Either<List<String>, Integer> either = client.readN();
                switch (either.type()) {
                    case LEFT: {
                        throw new AssertionError(((List)either.left()).get(0));
                    }
                    case RIGHT: {
                        return (Integer)either.right();
                    }
                }
                throw new IllegalStateException("Either can only be left or right");
            });
        }
        finally {
            client.stop();
        }
    }

    static final class BatchingClient {
        final EventLoopGroup group;
        final int port;
        Channel ch;

        BatchingClient(int port) {
            this.port = port;
            DefaultThreadFactory threadFactory = new DefaultThreadFactory(TestResourceTracker.getCurrentTestShortName());
            this.group = new NioEventLoopGroup(0, (ThreadFactory)threadFactory);
        }

        void start() {
            Bootstrap b = new Bootstrap();
            ((Bootstrap)((Bootstrap)((Bootstrap)b.group(this.group)).channel(NioSocketChannel.class)).option(ChannelOption.TCP_NODELAY, (Object)true)).handler((ChannelHandler)new ChannelInitializer<Channel>(){

                protected void initChannel(Channel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new ChannelHandler[]{new BatchingDecoder()});
                    p.addLast(new ChannelHandler[]{new BatchingEncoder()});
                    p.addLast(new ChannelHandler[]{new BatchingClientHandler()});
                }
            });
            try {
                ChannelFuture f = b.connect(HotRodTestingUtil.host(), this.port).sync();
                this.ch = f.channel();
            }
            catch (InterruptedException e) {
                throw new AssertionError((Object)e);
            }
        }

        void stop() throws InterruptedException {
            this.group.shutdownGracefully().await(10L, TimeUnit.SECONDS);
        }

        void writeN(int n) {
            this.ch.writeAndFlush((Object)n);
        }

        Either<List<String>, Integer> readN() {
            BatchingClientHandler last = (BatchingClientHandler)this.ch.pipeline().last();
            return last.errors.isEmpty() ? Either.newRight((Object)last.n) : Either.newLeft(last.errors);
        }

        private static final class BatchingClientHandler
        extends SimpleChannelInboundHandler<Object> {
            int n;
            List<String> errors = new ArrayList<String>();

            private BatchingClientHandler() {
            }

            protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
                if (msg instanceof String) {
                    this.errors.add((String)msg);
                } else {
                    ++this.n;
                }
            }
        }

        private static final class BatchingDecoder
        extends ReplayingDecoder {
            private BatchingDecoder() {
            }

            protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
                in.readUnsignedByte();
                long id = ExtendedByteBuf.readUnsignedLong((ByteBuf)in);
                short op = in.readUnsignedByte();
                in.readUnsignedByte();
                in.readUnsignedByte();
                switch (op) {
                    case 2: {
                        out.add(id);
                        break;
                    }
                    case 80: {
                        String error = ExtendedByteBuf.readString((ByteBuf)in);
                        out.add(error);
                    }
                }
            }
        }

        private static final class BatchingEncoder
        extends MessageToByteEncoder {
            private BatchingEncoder() {
            }

            protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
                int n = (Integer)msg;
                IntStream.range(0, n).forEach(i -> {
                    out.writeByte(160);
                    ExtendedByteBuf.writeUnsignedLong((long)i, (ByteBuf)out);
                    out.writeByte(25);
                    out.writeByte(1);
                    out.writeByte(0);
                    out.writeByte(0);
                    out.writeByte(3);
                    out.writeByte(0);
                    out.writeBytes(new byte[]{3, 49, 48, 48, 119, 3, 49, 48, 48});
                });
            }
        }
    }
}

