/*
 * Decompiled with CFR 0.152.
 */
package com.buabook.kdb.consumer;

import com.buabook.common.Printers;
import com.buabook.kdb.connection.KdbConnection;
import com.buabook.kdb.connection.KdbProcess;
import com.buabook.kdb.data.KdbDict;
import com.buabook.kdb.data.KdbTable;
import com.buabook.kdb.exceptions.DataConsumerException;
import com.buabook.kdb.exceptions.KdbProcessSubscriptionFailedException;
import com.buabook.kdb.exceptions.KdbTargetProcessUnavailableException;
import com.buabook.kdb.interfaces.IKdbRawDataConsumer;
import com.buabook.kdb.interfaces.IKdbTableConsumer;
import com.google.common.collect.ImmutableList;
import com.kx.c;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KdbConsumer
extends KdbConnection {
    private static final Logger log = LoggerFactory.getLogger(KdbConsumer.class);
    private static final List<String> SUPPORTED_UPD_FUNCTIONS = ImmutableList.builder().add((Object)"upd").add((Object)".u.upd").build();
    private static final String SUB_FUNCTION = ".u.sub";
    private static final Integer UPD_ARRAY_LENGTH = 3;
    private final List<String> subscriptionTables;
    private final KdbDict subscriptionConfiguration;
    private final IKdbRawDataConsumer rawDataConsumer;
    private final IKdbTableConsumer tableConsumer;

    private KdbConsumer(KdbProcess server, List<String> tables, KdbDict subscriptionConfiguration, IKdbRawDataConsumer rawDataConsumer, IKdbTableConsumer tableConsumer) throws KdbTargetProcessUnavailableException {
        super(server);
        if (rawDataConsumer == null && tableConsumer == null) {
            throw new NullPointerException("Must provied either a raw data or KdbTable consuming object, or both to this object");
        }
        this.subscriptionTables = tables;
        this.subscriptionConfiguration = subscriptionConfiguration;
        this.rawDataConsumer = rawDataConsumer;
        this.tableConsumer = tableConsumer;
        this.connect();
        log.info("Connected to kdb process [ Target: " + server.toString() + " ]");
    }

    protected KdbConsumer(KdbProcess server, List<String> tables, IKdbRawDataConsumer rawDataConsumer, IKdbTableConsumer tableConsumer) throws KdbTargetProcessUnavailableException {
        this(server, tables, null, rawDataConsumer, tableConsumer);
        if (tables == null) {
            throw new NullPointerException("Tables for a consumer cannot be null. Provide an empty list for ALL tables.");
        }
    }

    protected KdbConsumer(KdbProcess server, KdbDict subscriptionConfiguration, IKdbRawDataConsumer rawDataConsumer, IKdbTableConsumer tableConsumer) throws KdbTargetProcessUnavailableException {
        this(server, null, subscriptionConfiguration, rawDataConsumer, tableConsumer);
        if (subscriptionConfiguration == null || subscriptionConfiguration.isEmpty().booleanValue()) {
            throw new NullPointerException("No subscription configuration supplied. Cannot subscribe to process");
        }
    }

    public KdbConsumer(KdbProcess server, List<String> tables, IKdbTableConsumer tableConsumer) throws KdbTargetProcessUnavailableException {
        this(server, tables, null, tableConsumer);
    }

    public void subscribeAndListen() throws KdbProcessSubscriptionFailedException {
        Boolean sub = this.subscribe();
        if (!sub.booleanValue()) {
            throw new KdbProcessSubscriptionFailedException("Subscribe result was neither a dictionary nor a boolean true result.");
        }
        log.info("Subscription successful [ Process: {} ]", (Object)this.getRemoteProcess());
        this.listen();
    }

    @Override
    public void reconnect() {
        super.reconnect();
        Boolean resubResult = this.subscribe();
        while (!resubResult.booleanValue()) {
            log.error("Re-subscription to kdb process failed [ Process: {} ]", (Object)this.getRemoteProcess());
            try {
                Thread.sleep(this.reconnectIntervalMs.intValue());
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            resubResult = this.subscribe();
        }
        log.info("Re-subscription successful [ Process: {} ]", (Object)this.getRemoteProcess());
    }

    private Boolean subscribe() throws UnsupportedOperationException {
        Object subscribeObject = null;
        if (this.subscriptionTables != null) {
            subscribeObject = this.subscriptionTables.isEmpty() ? "" : this.subscriptionTables.toArray();
            log.info("Attempting to subscribe to kdb process [ Process: {} ] [ Standard Table Subscription: {} ]", (Object)this.getRemoteProcess(), (Object)Printers.listToString(this.subscriptionTables));
        } else if (this.subscriptionConfiguration != null) {
            subscribeObject = this.subscriptionConfiguration.convertToDict();
            log.info("Attempting to subscribe to kdb process [ Process: {} ] [ Dict Config Subscription: {} ]", (Object)this.getRemoteProcess(), (Object)this.subscriptionConfiguration);
        } else {
            log.error("No subscription configuration or subscription tables specified. Cannot subscribe to process!");
            throw new UnsupportedOperationException("No subscription configuration or subscription tables");
        }
        Object subscribeResult = null;
        try {
            subscribeResult = this.getConnection().k(SUB_FUNCTION, subscribeObject);
        }
        catch (c.KException | IOException e) {
            log.error("Subscription to kdb process failed [ Process: {} ]. Error - {}", (Object)this.getRemoteProcess(), (Object)e.getMessage());
            return false;
        }
        if (subscribeResult instanceof c.Dict) {
            KdbDict snapshots = KdbDict.fromObject(subscribeResult);
            log.info("Subscription returned snapshots for tables: {}", (Object)Printers.listToString(snapshots.getKeys()));
            for (Object snapshot : snapshots.getKeys()) {
                try {
                    this.tableConsumer.consume(new KdbTable((String)snapshot, snapshots.getAs(snapshot, c.Flip.class)));
                }
                catch (DataConsumerException e) {
                    log.warn(e.getMessage(), (Throwable)e);
                }
            }
        }
        return subscribeResult != null;
    }

    private void listen() {
        log.debug("Commencing listening for updates from kdb process [ Process: {} ]", (Object)this.getRemoteProcess());
        while (this.isConnected()) {
            Object receivedKdbObject = null;
            try {
                receivedKdbObject = this.getConnection().k();
            }
            catch (UnsupportedEncodingException e) {
                log.warn("Unsupported data was received from the kdb process. Ignoring. Error - {}", (Object)e.getMessage());
                continue;
            }
            catch (c.KException e) {
                log.warn("KDB exception has occurred whilst trying to receive data. Ignoring. Error - {}", (Object)e.getMessage());
                continue;
            }
            catch (IOException e) {
                log.error("Low-level I/O exception has occurred. Disconnecting and reconnecting. Error - {}", (Object)e.getMessage());
                this.reconnect();
            }
            if (receivedKdbObject == null) continue;
            if (this.rawDataConsumer != null) {
                try {
                    this.rawDataConsumer.consume(receivedKdbObject);
                }
                catch (DataConsumerException e) {
                    log.warn(e.getMessage(), (Throwable)e);
                }
            }
            if (this.tableConsumer == null) continue;
            Object[] kdbObjectAsList = null;
            try {
                kdbObjectAsList = (Object[])receivedKdbObject;
            }
            catch (ClassCastException e) {
                log.debug("Received kdb object could not be cast into an object array. Not a table update. Error - {}", (Object)e.getMessage());
                continue;
            }
            if (kdbObjectAsList.length != UPD_ARRAY_LENGTH) {
                log.debug("Received kdb object is not of the correct length to be a table update. [ Expected: {} ] [ Actual: {} ]", (Object)UPD_ARRAY_LENGTH, (Object)kdbObjectAsList.length);
                continue;
            }
            String tableName = null;
            c.Flip tableData = null;
            try {
                String updFunc = (String)kdbObjectAsList[0];
                if (!SUPPORTED_UPD_FUNCTIONS.contains(updFunc)) {
                    log.debug("Element 0 of received list is not one of the supported upd function: {}. Not a table update message", (Object)Printers.listToString(SUPPORTED_UPD_FUNCTIONS));
                    continue;
                }
                tableName = (String)kdbObjectAsList[1];
                tableData = (c.Flip)kdbObjectAsList[2];
            }
            catch (ClassCastException e) {
                log.debug("Received kdb object elements could not be cast into a String (for table name) or Flip (for table data). Error - {}", (Object)e.getMessage());
                continue;
            }
            try {
                this.tableConsumer.consume(new KdbTable(tableName, tableData));
            }
            catch (DataConsumerException e) {
                log.warn(e.getMessage(), (Throwable)e);
            }
        }
        log.warn("This consumer has disconnected from the kdb process. Listening has stopped.");
    }
}

