/*
 * Decompiled with CFR 0.152.
 */
package info.michaelwittig.javaq.connector.impl;

import info.michaelwittig.javaq.connector.QConnectorAsync;
import info.michaelwittig.javaq.connector.QConnectorDataListener;
import info.michaelwittig.javaq.connector.QConnectorError;
import info.michaelwittig.javaq.connector.QConnectorException;
import info.michaelwittig.javaq.connector.QConnectorListener;
import info.michaelwittig.javaq.connector.impl.CResultHelper;
import info.michaelwittig.javaq.connector.impl.QConnectorImpl;
import info.michaelwittig.javaq.connector.impl.cmd.ConnectorAsyncCommand;
import info.michaelwittig.javaq.connector.impl.cmd.ConnectorAsyncCommandQ;
import info.michaelwittig.javaq.query.Result;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.SocketTimeoutException;
import java.util.Date;
import java.util.TimeZone;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import kx.c;

final class QConnectorAsyncImpl
extends QConnectorImpl
implements QConnectorAsync {
    private static final ConnectorAsyncCommandQ STOP_COMMAND = new ConnectorAsyncCommandQ("");
    private final QConnectorListener listener;
    private final AtomicReference<c> cref = new AtomicReference();
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private final BlockingQueue<ConnectorAsyncCommand> commands = new LinkedBlockingQueue<ConnectorAsyncCommand>();
    private final Timer timer = new Timer();
    private final CopyOnWriteArrayList<ConnectorAsyncCommandQ> subscribes = new CopyOnWriteArrayList();
    private final AtomicReference<UUID> currentRun = new AtomicReference();

    protected QConnectorAsyncImpl(QConnectorListener aListener, String aHost, int aPort, boolean aReconnectOnError) {
        super(aHost, aPort, aReconnectOnError);
        this.listener = aListener;
    }

    @Override
    public void connect() throws QConnectorException, QConnectorError {
        UUID run = UUID.randomUUID();
        this.currentRun.set(run);
        try {
            if (!this.cref.compareAndSet(null, new c(this.getHost(), this.getPort()))) {
                throw new QConnectorError("Already connected");
            }
            this.cref.get().tz = TimeZone.getTimeZone("UTC");
            new Thread(new Reader(run)).start();
            new Thread(new Executor(run, this.cref.get())).start();
        }
        catch (c.KException e) {
            throw new QConnectorException("KException", e);
        }
        catch (IOException e) {
            if (this.reconnectOnError()) {
                this.throwQError(new QConnectorError("Could not connect to " + this.getHost() + ":" + this.getPort()));
                this.reconnect(run);
            }
            throw new QConnectorException("Could not connect to " + this.getHost() + ":" + this.getPort(), e);
        }
    }

    @Override
    public void disconnect() throws QConnectorError {
        this.disconnect(true);
    }

    private void disconnect(boolean clearSubscriptions) throws QConnectorError {
        c old = this.cref.get();
        if (old == null) {
            throw new QConnectorError("Not connected");
        }
        if (!this.cref.compareAndSet(old, null)) {
            throw new QConnectorError("Already disconnected");
        }
        if (clearSubscriptions) {
            this.subscribes.clear();
        }
        this.commands.offer(STOP_COMMAND);
        try {
            old.close();
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    private void disconnectSilent(UUID run, boolean clearSubscriptions) {
        if (this.currentRun.compareAndSet(run, null)) {
            try {
                this.disconnect(clearSubscriptions);
            }
            catch (QConnectorError qConnectorError) {
                // empty catch block
            }
        }
    }

    private void reconnect(UUID run) {
        if (this.currentRun.compareAndSet(run, null)) {
            try {
                this.disconnect(false);
            }
            catch (QConnectorError e) {
                // empty catch block
            }
            long time = System.currentTimeMillis();
            this.timer.schedule((TimerTask)new ReconnectTask(), new Date(time + 1000L));
        }
    }

    @Override
    public void subscribe(QConnectorDataListener aListener, String[] tables, String[] symbols) throws QConnectorException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void subscribe(String handle, String[] tables, String[] symbols) throws QConnectorException {
        StringBuilder t = new StringBuilder();
        if (tables.length > 0) {
            for (String table : tables) {
                t.append("`");
                t.append(table);
            }
        } else {
            t.append("`");
        }
        StringBuilder s = new StringBuilder();
        if (symbols.length > 0) {
            for (String symbol : symbols) {
                s.append("`");
                s.append(symbol);
            }
        } else {
            s.append("`");
        }
        ConnectorAsyncCommandQ q = new ConnectorAsyncCommandQ(".u.sub[" + t.toString() + ";" + s.toString() + "]");
        this.subscribes.add(q);
        this.execute(q);
    }

    @Override
    public void unsubscribe(String handle) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void unsubscribe(QConnectorDataListener aListener) {
        throw new UnsupportedOperationException();
    }

    @Override
    public QConnectorListener getConnectorListener() {
        return this.listener;
    }

    private void execute(ConnectorAsyncCommand cmd) {
        this.commands.offer(cmd);
    }

    private void throwQException(final QConnectorException e) {
        this.executor.execute(new Runnable(){

            @Override
            public void run() {
                QConnectorAsyncImpl.this.listener.exception(e);
            }
        });
    }

    private void throwQError(final QConnectorError e) {
        this.executor.execute(new Runnable(){

            @Override
            public void run() {
                QConnectorAsyncImpl.this.listener.error(e);
            }
        });
    }

    private void throwResult(final Result result) {
        this.executor.execute(new Runnable(){

            @Override
            public void run() {
                QConnectorAsyncImpl.this.listener.resultReceived("", result);
            }
        });
    }

    @Override
    public boolean isConnected() {
        return this.cref.get() != null;
    }

    private final class Reader
    implements Runnable {
        private final UUID run;

        public Reader(UUID aRun) {
            this.run = aRun;
        }

        @Override
        public void run() {
            while (QConnectorAsyncImpl.this.cref.get() != null) {
                try {
                    Result result;
                    Object res = ((c)QConnectorAsyncImpl.this.cref.get()).k();
                    if (res == null) continue;
                    try {
                        result = CResultHelper.convert(res);
                    }
                    catch (QConnectorException e) {
                        QConnectorAsyncImpl.this.throwQException(e);
                        continue;
                    }
                    QConnectorAsyncImpl.this.throwResult(result);
                }
                catch (SocketTimeoutException e) {
                }
                catch (UnsupportedEncodingException e) {
                    QConnectorAsyncImpl.this.throwQException(new QConnectorException("UnsupportedEncodingException", e));
                }
                catch (c.KException e) {
                    QConnectorAsyncImpl.this.throwQException(new QConnectorException("KException", e));
                }
                catch (IOException e) {
                    if (QConnectorAsyncImpl.this.reconnectOnError()) {
                        QConnectorAsyncImpl.this.reconnect(this.run);
                        QConnectorAsyncImpl.this.throwQError(new QConnectorError("Could not read from " + QConnectorAsyncImpl.this.getHost() + ":" + QConnectorAsyncImpl.this.getPort()));
                        break;
                    }
                    QConnectorAsyncImpl.this.disconnectSilent(this.run, true);
                    QConnectorAsyncImpl.this.throwQException(new QConnectorException("Could not read from " + QConnectorAsyncImpl.this.getHost() + ":" + QConnectorAsyncImpl.this.getPort(), e));
                    break;
                }
            }
        }
    }

    private final class Executor
    implements Runnable {
        private final UUID run;
        private final c c;

        public Executor(UUID aRun, c aC) {
            this.run = aRun;
            this.c = aC;
        }

        @Override
        public void run() {
            while (true) {
                ConnectorAsyncCommand cmd;
                try {
                    cmd = (ConnectorAsyncCommand)QConnectorAsyncImpl.this.commands.poll(1L, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    continue;
                }
                if (cmd == null) {
                    try {
                        this.c.ks("1");
                        continue;
                    }
                    catch (Exception e) {
                        if (QConnectorAsyncImpl.this.reconnectOnError()) {
                            QConnectorAsyncImpl.this.reconnect(this.run);
                            QConnectorAsyncImpl.this.throwQError(new QConnectorError("Could not talk to " + QConnectorAsyncImpl.this.getHost() + ":" + QConnectorAsyncImpl.this.getPort()));
                            break;
                        }
                        QConnectorAsyncImpl.this.disconnectSilent(this.run, true);
                        QConnectorAsyncImpl.this.throwQException(new QConnectorException("Could not talk to " + QConnectorAsyncImpl.this.getHost() + ":" + QConnectorAsyncImpl.this.getPort(), e));
                        break;
                    }
                }
                if (cmd == STOP_COMMAND) break;
                try {
                    cmd.execute(this.c);
                    continue;
                }
                catch (QConnectorException e) {
                    QConnectorAsyncImpl.this.throwQException(e);
                    continue;
                }
                catch (c.KException e) {
                    QConnectorAsyncImpl.this.throwQException(new QConnectorException("KException", e));
                    continue;
                }
                catch (IOException e) {
                    if (QConnectorAsyncImpl.this.reconnectOnError()) {
                        QConnectorAsyncImpl.this.commands.offer(cmd);
                        QConnectorAsyncImpl.this.reconnect(this.run);
                        QConnectorAsyncImpl.this.throwQError(new QConnectorError("Could not talk to " + QConnectorAsyncImpl.this.getHost() + ":" + QConnectorAsyncImpl.this.getPort()));
                        break;
                    }
                    QConnectorAsyncImpl.this.disconnectSilent(this.run, true);
                    QConnectorAsyncImpl.this.throwQException(new QConnectorException("Could not talk to " + QConnectorAsyncImpl.this.getHost() + ":" + QConnectorAsyncImpl.this.getPort(), e));
                }
                break;
            }
        }
    }

    private final class ReconnectTask
    extends TimerTask {
        @Override
        public void run() {
            try {
                QConnectorAsyncImpl.this.connect();
                for (ConnectorAsyncCommandQ q : QConnectorAsyncImpl.this.subscribes) {
                    QConnectorAsyncImpl.this.execute(q);
                }
            }
            catch (QConnectorException e) {
                QConnectorAsyncImpl.this.throwQError(new QConnectorError("Reconnect failed"));
            }
            catch (QConnectorError qConnectorError) {
                // empty catch block
            }
        }
    }
}

