/*
 * Decompiled with CFR 0.152.
 */
package info.michaelwittig.javaq.connector.impl;

import com.google.common.util.concurrent.SettableFuture;
import info.michaelwittig.javaq.connector.QConnectorError;
import info.michaelwittig.javaq.connector.QConnectorException;
import info.michaelwittig.javaq.connector.QConnectorSync;
import info.michaelwittig.javaq.connector.impl.QConnectorImpl;
import info.michaelwittig.javaq.connector.impl.cmd.ConnectorSyncCommand;
import info.michaelwittig.javaq.connector.impl.cmd.ConnectorSyncCommandFunction;
import info.michaelwittig.javaq.connector.impl.cmd.ConnectorSyncCommandQ;
import info.michaelwittig.javaq.connector.impl.cmd.ConnectorSyncCommandSelect;
import info.michaelwittig.javaq.query.Function;
import info.michaelwittig.javaq.query.Result;
import info.michaelwittig.javaq.query.Select;
import java.io.IOException;
import java.util.TimeZone;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import kx.c;

final class QConnectorSyncTSImpl
extends QConnectorImpl
implements QConnectorSync {
    private static final ConnectorSyncCommandQ STOP_COMMAND = new ConnectorSyncCommandQ("");
    private static final ConnectorSyncCommandQ START_COMMAND = new ConnectorSyncCommandQ("");
    private static final Result EMPTY_RESULT = new Result(){};
    private final BlockingQueue<ConnectorSyncCommandWithFutureValue> commands = new LinkedBlockingQueue<ConnectorSyncCommandWithFutureValue>();
    private final AtomicReference<Executor> executor = new AtomicReference<Object>(null);

    protected QConnectorSyncTSImpl(String aHost, int aPort, boolean aReconnectOnError) {
        super(aHost, aPort, aReconnectOnError);
    }

    @Override
    public void connect() throws QConnectorException, QConnectorError {
        if (!this.executor.compareAndSet(null, new Executor())) {
            throw new QConnectorError("Already connected");
        }
        SettableFuture future = SettableFuture.create();
        this.commands.offer(new ConnectorSyncCommandWithFutureValue(START_COMMAND, (SettableFuture<Result>)future));
        new Thread(this.executor.get()).start();
        try {
            future.get();
        }
        catch (ExecutionException e) {
            throw new QConnectorException("KException", e);
        }
        catch (InterruptedException e) {
            throw new QConnectorException("KException", e);
        }
    }

    @Override
    public void disconnect() throws QConnectorError {
        Executor old = this.executor.get();
        if (old == null) {
            throw new QConnectorError("Not connected");
        }
        if (!this.executor.compareAndSet(old, null)) {
            throw new QConnectorError("Already disconnected");
        }
        SettableFuture future = SettableFuture.create();
        this.commands.offer(new ConnectorSyncCommandWithFutureValue(STOP_COMMAND, (SettableFuture<Result>)future));
    }

    @Override
    public boolean isConnected() {
        return this.executor.get() != null;
    }

    private Result cmd(ConnectorSyncCommand cmd) throws QConnectorException {
        SettableFuture future = SettableFuture.create();
        this.commands.offer(new ConnectorSyncCommandWithFutureValue(cmd, (SettableFuture<Result>)future));
        try {
            return (Result)future.get();
        }
        catch (InterruptedException e) {
            throw new QConnectorException(e.getMessage());
        }
        catch (ExecutionException e) {
            throw new QConnectorException(e.getMessage());
        }
    }

    @Override
    public Result execute(String q) throws QConnectorException {
        return this.cmd(new ConnectorSyncCommandQ(q));
    }

    @Override
    public Result select(Select select) throws QConnectorException {
        return this.cmd(new ConnectorSyncCommandSelect(select));
    }

    @Override
    public Result call(Function function) throws QConnectorException {
        return this.cmd(new ConnectorSyncCommandFunction(function));
    }

    private static final class ConnectorSyncCommandWithFutureValue {
        private final ConnectorSyncCommand cmd;
        private final SettableFuture<Result> future;
        private final AtomicBoolean reconnectTry = new AtomicBoolean(false);

        public ConnectorSyncCommandWithFutureValue(ConnectorSyncCommand cmd, SettableFuture<Result> future) {
            this.cmd = cmd;
            this.future = future;
        }

        public ConnectorSyncCommand getCmd() {
            return this.cmd;
        }

        public SettableFuture<Result> getFuture() {
            return this.future;
        }

        public boolean tryReconnect() {
            return this.reconnectTry.compareAndSet(false, true);
        }
    }

    private final class Executor
    implements Runnable {
        private Executor() {
        }

        @Override
        public void run() {
            SettableFuture<Result> future;
            c c2 = null;
            while (true) {
                ConnectorSyncCommandWithFutureValue t;
                try {
                    t = (ConnectorSyncCommandWithFutureValue)QConnectorSyncTSImpl.this.commands.take();
                }
                catch (InterruptedException e) {
                    continue;
                }
                ConnectorSyncCommand cmd = t.getCmd();
                future = t.getFuture();
                if (cmd == START_COMMAND) {
                    if (c2 != null) {
                        future.setException((Throwable)new QConnectorError("Already connected"));
                        continue;
                    }
                    try {
                        c2 = new c(QConnectorSyncTSImpl.this.getHost(), QConnectorSyncTSImpl.this.getPort());
                        c2.tz = TimeZone.getTimeZone("UTC");
                        future.set((Object)EMPTY_RESULT);
                    }
                    catch (c.KException e) {
                        future.setException((Throwable)e);
                    }
                    catch (IOException e) {
                        future.setException((Throwable)e);
                    }
                    continue;
                }
                if (c2 == null) {
                    try {
                        try {
                            Thread.sleep(1000L);
                        }
                        catch (InterruptedException ie) {
                            // empty catch block
                        }
                        c2 = new c(QConnectorSyncTSImpl.this.getHost(), QConnectorSyncTSImpl.this.getPort());
                    }
                    catch (c.KException e) {
                        future.setException((Throwable)e);
                        continue;
                    }
                    catch (IOException e) {
                        future.setException((Throwable)e);
                        continue;
                    }
                }
                if (cmd == STOP_COMMAND) {
                    try {
                        c2.close();
                        c2 = null;
                    }
                    catch (Exception e) {
                        // empty catch block
                    }
                    break;
                }
                try {
                    Result result = cmd.execute(c2);
                    future.set((Object)result);
                }
                catch (QConnectorException e) {
                    future.setException((Throwable)e);
                }
                catch (c.KException e) {
                    future.setException((Throwable)new QConnectorException("Q failed", e));
                }
                catch (IOException e) {
                    if (t.tryReconnect() && QConnectorSyncTSImpl.this.reconnectOnError()) {
                        c2 = null;
                        QConnectorSyncTSImpl.this.commands.offer(t);
                        continue;
                    }
                    future.setException((Throwable)new QConnectorException("Could not talk to " + QConnectorSyncTSImpl.this.getHost() + ":" + QConnectorSyncTSImpl.this.getPort(), e));
                }
            }
            future.set((Object)EMPTY_RESULT);
        }
    }
}

