/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.redis.client.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import io.vertx.redis.client.ResponseType;
import io.vertx.redis.client.impl.ArrayQueue;
import io.vertx.redis.client.impl.ParserHandler;
import io.vertx.redis.client.impl.RESPParser;
import io.vertx.redis.client.impl.RequestImpl;
import io.vertx.redis.client.impl.types.ErrorType;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class RedisClient
implements Redis,
ParserHandler {
    private static final Logger LOG = LoggerFactory.getLogger(RedisClient.class);
    private static final ErrorType CONNECTION_CLOSED = ErrorType.create("CONNECTION_CLOSED");
    private final ArrayQueue waiting;
    private final NetClient netClient;
    private final SocketAddress socketAddress;
    private final RedisOptions options;
    private Handler<Throwable> onException = t -> LOG.error((Object)"Unhandled Error", t);
    private Handler<Void> onEnd;
    private Handler<Response> onMessage;
    private NetSocket netSocket;
    private boolean connected = false;

    public static Redis create(Vertx vertx, RedisOptions options) {
        return RedisClient.create(vertx, options, options.getEndpoint());
    }

    static Redis create(Vertx vertx, RedisOptions options, SocketAddress address) {
        return new RedisClient(vertx, options, address);
    }

    private static void authenticate(Redis client, RedisOptions options, Handler<AsyncResult<Void>> handler) {
        if (options.getPassword() == null) {
            handler.handle((Object)Future.succeededFuture());
            return;
        }
        client.send(Request.cmd(Command.AUTH).arg(options.getPassword()), (Handler<AsyncResult<Response>>)((Handler)auth -> {
            if (auth.failed()) {
                handler.handle((Object)Future.failedFuture((Throwable)auth.cause()));
            } else {
                handler.handle((Object)Future.succeededFuture());
            }
        }));
    }

    private static void select(Redis client, RedisOptions options, Handler<AsyncResult<Void>> handler) {
        if (options.getSelect() == null) {
            handler.handle((Object)Future.succeededFuture());
            return;
        }
        client.send(Request.cmd(Command.SELECT).arg(options.getSelect()), (Handler<AsyncResult<Response>>)((Handler)auth -> {
            if (auth.failed()) {
                handler.handle((Object)Future.failedFuture((Throwable)auth.cause()));
            } else {
                handler.handle((Object)Future.succeededFuture());
            }
        }));
    }

    private RedisClient(Vertx vertx, RedisOptions options, SocketAddress endpoint) {
        this.netClient = vertx.createNetClient(options.getNetClientOptions());
        this.waiting = new ArrayQueue(options.getMaxWaitingHandlers());
        this.socketAddress = endpoint;
        this.options = options;
    }

    @Override
    public Redis connect(Handler<AsyncResult<Redis>> onConnect) {
        if (this.connected) {
            onConnect.handle((Object)Future.succeededFuture((Object)this));
            return this;
        }
        this.netClient.connect(this.socketAddress, clientConnect -> {
            if (clientConnect.failed()) {
                this.netClient.close();
                onConnect.handle((Object)Future.failedFuture((Throwable)clientConnect.cause()));
                return;
            }
            this.netSocket = (NetSocket)clientConnect.result();
            this.connected = true;
            this.netSocket.handler((Handler)new RESPParser(this, this.options.getMaxNestedArrays())).closeHandler(close -> {
                this.netClient.close();
                this.cleanupQueue(CONNECTION_CLOSED);
                this.connected = false;
                if (this.onEnd != null) {
                    this.onEnd.handle(close);
                }
            }).exceptionHandler(exception -> {
                this.netSocket.close();
                this.netClient.close();
                this.cleanupQueue((Throwable)exception);
                this.connected = false;
                if (this.onException != null) {
                    this.onException.handle(exception);
                }
            });
            RedisClient.authenticate(this, this.options, (Handler<AsyncResult<Void>>)((Handler)authenticate -> {
                if (authenticate.failed()) {
                    onConnect.handle((Object)Future.failedFuture((Throwable)authenticate.cause()));
                    return;
                }
                RedisClient.select(this, this.options, (Handler<AsyncResult<Void>>)((Handler)select -> {
                    if (select.failed()) {
                        onConnect.handle((Object)Future.failedFuture((Throwable)select.cause()));
                        return;
                    }
                    onConnect.handle((Object)Future.succeededFuture((Object)this));
                }));
            }));
        });
        return this;
    }

    @Override
    public void close() {
        if (this.netSocket != null) {
            this.netSocket.close();
        }
        this.connected = false;
    }

    @Override
    public Redis exceptionHandler(Handler<Throwable> handler) {
        this.onException = handler;
        return this;
    }

    @Override
    public Redis endHandler(Handler<Void> handler) {
        this.onEnd = handler;
        return this;
    }

    @Override
    public Redis handler(Handler<Response> handler) {
        this.onMessage = handler;
        return this;
    }

    @Override
    public Redis pause() {
        this.netSocket.pause();
        return this;
    }

    @Override
    public Redis resume() {
        this.netSocket.resume();
        return this;
    }

    @Override
    public Redis fetch(long size) {
        return this;
    }

    private void cleanupQueue(Throwable t) {
        Handler req;
        while ((req = (Handler)this.waiting.poll()) != null) {
            if (t == null) continue;
            try {
                req.handle((Object)Future.failedFuture((Throwable)t));
            }
            catch (RuntimeException e) {
                LOG.warn((Object)"Exception during cleanup", (Throwable)e);
            }
        }
    }

    @Override
    public Redis send(Request request, Handler<AsyncResult<Response>> handler) {
        if (!this.connected) {
            handler.handle((Object)Future.failedFuture((String)"Redis connection is broken."));
            return this;
        }
        if (this.waiting.isFull()) {
            handler.handle((Object)Future.failedFuture((String)"Redis waiting Queue is full"));
            return this;
        }
        try {
            Buffer message = ((RequestImpl)request).encode();
            this.waiting.offer(handler);
            this.netSocket.write(message);
        }
        catch (RuntimeException e) {
            this.onException.handle((Object)e);
        }
        return this;
    }

    @Override
    public Redis batch(List<Request> commands, Handler<AsyncResult<List<Response>>> handler) {
        if (this.waiting.freeSlots() < commands.size()) {
            handler.handle((Object)Future.failedFuture((String)"Redis waiting Queue is full"));
            return this;
        }
        ArrayList<Handler> callbacks = new ArrayList<Handler>(commands.size());
        ArrayList replies = new ArrayList(commands.size());
        AtomicInteger count = new AtomicInteger(commands.size());
        AtomicBoolean failed = new AtomicBoolean(false);
        Buffer messages = Buffer.buffer();
        int i = 0;
        while (i < commands.size()) {
            int index = i++;
            RequestImpl req = (RequestImpl)commands.get(index);
            req.encode(messages);
            callbacks.add(index, command -> {
                if (!failed.get()) {
                    if (command.failed()) {
                        failed.set(true);
                        if (handler != null) {
                            handler.handle((Object)Future.failedFuture((Throwable)command.cause()));
                        }
                        return;
                    }
                    replies.add(index, command.result());
                    if (count.decrementAndGet() == 0 && handler != null) {
                        handler.handle((Object)Future.succeededFuture((Object)replies));
                    }
                }
            });
        }
        this.netSocket.write(messages, write -> {
            if (write.succeeded()) {
                for (Handler callback : callbacks) {
                    this.waiting.offer(callback);
                }
            } else {
                try {
                    handler.handle((Object)Future.failedFuture((Throwable)write.cause()));
                }
                catch (Throwable t) {
                    this.fail(t);
                }
            }
        });
        return this;
    }

    @Override
    public void handle(Response reply) {
        if (this.waiting.isEmpty()) {
            if (this.onMessage != null) {
                this.onMessage.handle((Object)reply);
            } else {
                LOG.warn((Object)("No handler waiting for message: " + reply));
            }
            return;
        }
        Handler req = (Handler)this.waiting.poll();
        if (req != null) {
            if (reply == null) {
                try {
                    req.handle((Object)Future.succeededFuture());
                }
                catch (RuntimeException e) {
                    this.onException.handle((Object)e);
                }
                return;
            }
            if (reply.type() == ResponseType.ERROR) {
                try {
                    req.handle((Object)Future.failedFuture((Throwable)((ErrorType)reply)));
                }
                catch (RuntimeException e) {
                    this.onException.handle((Object)e);
                }
                return;
            }
            try {
                req.handle((Object)Future.succeededFuture((Object)reply));
            }
            catch (RuntimeException e) {
                this.onException.handle((Object)e);
            }
        } else {
            LOG.error((Object)("No handler waiting for message: " + reply));
        }
    }

    @Override
    public SocketAddress socketAddress() {
        return this.socketAddress;
    }

    @Override
    public void fail(Throwable t) {
        if (this.onException != null) {
            this.onException.handle((Object)t);
        } else {
            LOG.error((Object)"External failure", t);
        }
    }

    @Override
    public void fatal(Throwable t) {
        if (this.onException != null) {
            this.onException.handle((Object)t);
        } else {
            LOG.error((Object)"External failure", t);
        }
        this.close();
        this.cleanupQueue(t);
    }
}

