/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.redis.client.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.pool.PoolConnector;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.RedisConnection;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import io.vertx.redis.client.ResponseType;
import io.vertx.redis.client.impl.ArrayQueue;
import io.vertx.redis.client.impl.ParserHandler;
import io.vertx.redis.client.impl.RedisConnectionInternal;
import io.vertx.redis.client.impl.RequestImpl;
import io.vertx.redis.client.impl.types.ErrorType;
import io.vertx.redis.client.impl.types.Multi;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class RedisStandaloneConnection
implements RedisConnectionInternal,
ParserHandler {
    private static final String BASE_ADDRESS = "io.vertx.redis";
    private static final Logger LOG = LoggerFactory.getLogger(RedisStandaloneConnection.class);
    private static final ErrorType CONNECTION_CLOSED = ErrorType.create("CONNECTION_CLOSED");
    private final PoolConnector.Listener listener;
    private final VertxInternal vertx;
    private final ContextInternal context;
    private final EventBus eventBus;
    private final NetSocket netSocket;
    private final long expiresAt;
    private final ArrayQueue waiting;
    private Handler<Throwable> onException;
    private Handler<Void> onEnd;
    private Handler<Response> onMessage;
    private Runnable onEvict;
    private boolean closed = false;
    private boolean tainted = false;

    public RedisStandaloneConnection(VertxInternal vertx, ContextInternal context, PoolConnector.Listener connectionListener, NetSocket netSocket, RedisOptions options) {
        this.vertx = vertx;
        this.context = context;
        this.listener = connectionListener;
        this.eventBus = vertx.eventBus();
        this.netSocket = netSocket;
        this.waiting = new ArrayQueue(options.getMaxWaitingHandlers());
        this.expiresAt = options.getPoolRecycleTimeout() == -1 ? -1L : System.currentTimeMillis() + (long)options.getPoolRecycleTimeout();
    }

    synchronized void setValid() {
        this.closed = false;
        this.tainted = false;
    }

    @Override
    public void forceClose() {
        this.netSocket.close();
    }

    @Override
    public boolean isValid() {
        return !this.closed && (this.expiresAt <= 0L || System.currentTimeMillis() < this.expiresAt);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Future<Void> close() {
        if (this.listener == null) {
            return this.netSocket.close();
        }
        RedisStandaloneConnection redisStandaloneConnection = this;
        synchronized (redisStandaloneConnection) {
            this.closed = true;
        }
        return Future.succeededFuture();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean pendingQueueFull() {
        ArrayQueue arrayQueue = this.waiting;
        synchronized (arrayQueue) {
            return this.waiting.isFull();
        }
    }

    @Override
    public RedisConnection exceptionHandler(Handler<Throwable> handler) {
        this.onException = handler;
        return this;
    }

    @Override
    public RedisConnection endHandler(Handler<Void> handler) {
        this.onEnd = handler;
        return this;
    }

    RedisConnection evictHandler(Runnable handler) {
        this.onEvict = handler;
        return this;
    }

    @Override
    public RedisConnection handler(Handler<Response> handler) {
        this.onMessage = handler;
        return this;
    }

    @Override
    public RedisConnection pause() {
        this.netSocket.pause();
        return this;
    }

    @Override
    public RedisConnection resume() {
        this.netSocket.resume();
        return this;
    }

    @Override
    public RedisConnection fetch(long size) {
        this.netSocket.fetch(size);
        return this;
    }

    private void taintCheck(Command cmd) {
        if (this.listener != null && (cmd.isPubSub() || Command.SELECT.equals(cmd) || Command.AUTH.equals(cmd))) {
            this.tainted = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Future<Response> send(Request request) {
        PromiseInternal promise;
        if (this.closed) {
            throw new IllegalStateException("Connection is closed");
        }
        this.context.execute((Object)request.command(), this::taintCheck);
        boolean voidCmd = request.command().isPubSub();
        Buffer message = ((RequestImpl)request).encode();
        if (!voidCmd) {
            ArrayQueue arrayQueue = this.waiting;
            synchronized (arrayQueue) {
                if (this.waiting.isFull()) {
                    return Future.failedFuture((String)"Redis waiting Queue is full");
                }
                promise = this.vertx.promise();
                this.waiting.offer(promise);
            }
        } else {
            promise = this.vertx.promise();
        }
        try {
            this.netSocket.write((Object)message).onFailure(this::fail).onSuccess(arg_0 -> this.lambda$send$0(voidCmd, (Promise)promise, arg_0));
        }
        catch (RuntimeException err) {
            this.context.execute((Object)err, this::fail);
            promise.fail((Throwable)err);
        }
        return promise.future();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Future<List<Response>> batch(List<Request> commands) {
        if (this.closed) {
            throw new IllegalStateException("Connection is closed");
        }
        if (commands.isEmpty()) {
            LOG.debug((Object)"Empty batch");
            return Future.succeededFuture(Collections.emptyList());
        }
        PromiseInternal promise = this.vertx.promise();
        ArrayList<PromiseInternal> callbacks = new ArrayList<PromiseInternal>(commands.size());
        ArrayList replies = new ArrayList(commands.size());
        AtomicInteger count = new AtomicInteger(commands.size());
        AtomicBoolean failed = new AtomicBoolean(false);
        Buffer messages = Buffer.buffer();
        int i232 = 0;
        while (i232 < commands.size()) {
            int index;
            RequestImpl requestImpl;
            if ((requestImpl = (RequestImpl)commands.get(index = i232++)).command().isPubSub()) {
                return Future.failedFuture((String)"PubSub command in batch not allowed");
            }
            requestImpl.encode(messages);
            this.taintCheck(requestImpl.command());
            callbacks.add(index, this.vertx.promise(arg_0 -> this.lambda$batch$1(failed, (Promise)promise, replies, index, count, arg_0)));
        }
        ArrayQueue i232 = this.waiting;
        synchronized (i232) {
            if (this.waiting.freeSlots() < callbacks.size()) {
                return Future.failedFuture((String)"Redis waiting Queue is full");
            }
            for (Promise promise2 : callbacks) {
                this.waiting.offer(promise2);
            }
        }
        try {
            this.netSocket.write((Object)messages).onFailure(this::fail);
        }
        catch (RuntimeException err) {
            this.context.execute((Object)err, this::fail);
            promise.fail((Throwable)err);
        }
        return promise.future();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handle(Response reply) {
        Promise req;
        boolean empty;
        ArrayQueue arrayQueue = this.waiting;
        synchronized (arrayQueue) {
            empty = this.waiting.isEmpty();
            req = !empty ? (Promise)this.waiting.poll() : null;
        }
        if (reply != null && reply.type() == ResponseType.PUSH || empty) {
            if (this.onMessage != null) {
                this.context.execute((Object)reply, this.onMessage);
            } else {
                if (reply instanceof Multi) {
                    if (reply.size() == 3 && "message".equals(reply.get(0).toString())) {
                        this.eventBus.send("io.vertx.redis." + reply.get(1).toString(), (Object)new JsonObject().put("status", (Object)"OK").put("value", (Object)new JsonObject().put("channel", (Object)reply.get(1).toString()).put("message", (Object)reply.get(2).toString())));
                        return;
                    }
                    if (reply.size() == 4 && "pmessage".equals(reply.get(0).toString())) {
                        this.eventBus.send("io.vertx.redis." + reply.get(1).toString(), (Object)new JsonObject().put("status", (Object)"OK").put("value", (Object)new JsonObject().put("pattern", (Object)reply.get(1).toString()).put("channel", (Object)reply.get(2).toString()).put("message", (Object)reply.get(3).toString())));
                        return;
                    }
                }
                LOG.warn((Object)("No handler waiting for message: " + reply));
            }
            return;
        }
        if (req != null) {
            boolean resolved;
            if (reply == null) {
                resolved = req.tryComplete();
            } else {
                boolean bl = resolved = reply.type() == ResponseType.ERROR ? req.tryFail((Throwable)((ErrorType)reply)) : req.tryComplete((Object)reply);
            }
            if (!resolved && this.onException != null) {
                this.context.execute((Object)new IllegalStateException("Result is already complete: [" + req + "]"), this.onException);
            }
        } else {
            LOG.error((Object)("No handler waiting for message: " + reply));
        }
    }

    public synchronized void end(Void v) {
        assert (!this.closed);
        this.closed = true;
        this.evict();
        this.cleanupQueue(CONNECTION_CLOSED);
        if (this.onEnd != null) {
            this.context.execute((Object)v, this.onEnd);
        }
    }

    @Override
    public synchronized void fail(Throwable t) {
        assert (!this.closed);
        this.closed = true;
        this.evict();
        this.cleanupQueue(t);
        if (this.onException != null) {
            this.context.execute((Object)t, this.onException);
        }
    }

    @Override
    public synchronized boolean reset() {
        if (this.closed) {
            return false;
        }
        if (this.tainted) {
            this.evict();
            return false;
        }
        return true;
    }

    private void evict() {
        if (this.listener != null) {
            this.listener.onRemove();
        }
        if (this.onEvict != null) {
            this.onEvict.run();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void cleanupQueue(Throwable t) {
        ArrayQueue arrayQueue = this.waiting;
        synchronized (arrayQueue) {
            Promise req;
            while ((req = (Promise)this.waiting.poll()) != null) {
                if (req instanceof PromiseInternal && ((PromiseInternal)req).isComplete()) continue;
                try {
                    req.tryFail(t);
                }
                catch (RuntimeException err) {
                    LOG.warn((Object)"Exception while running cleanup", (Throwable)err);
                }
            }
        }
    }

    private /* synthetic */ void lambda$batch$1(AtomicBoolean failed, Promise promise, List replies, int index, AtomicInteger count, AsyncResult command) {
        if (!failed.get() && command.failed()) {
            failed.set(true);
            if (!promise.tryFail(command.cause()) && this.onException != null) {
                this.context.execute((Object)new IllegalStateException("Result is already complete: [" + promise + "]"), this.onException);
            }
            return;
        }
        replies.add(index, command.result());
        if (count.decrementAndGet() == 0 && !promise.tryComplete((Object)replies) && this.onException != null) {
            this.context.execute((Object)new IllegalStateException("Result is already complete: [" + promise + "]"), this.onException);
        }
    }

    private /* synthetic */ void lambda$send$0(boolean voidCmd, Promise promise, Void ok) {
        if (voidCmd && !promise.tryComplete() && this.onException != null) {
            this.context.execute((Object)new IllegalStateException("Result is already complete: [" + promise + "]"), this.onException);
        }
    }
}

