/*
 * Decompiled with CFR 0.152.
 */
package com.buabook.kdb.publisher;

import com.buabook.kdb.connection.KdbProcess;
import com.buabook.kdb.data.KdbTable;
import com.buabook.kdb.exceptions.KdbTargetProcessUnavailableException;
import com.buabook.kdb.publisher.KdbPublisher;
import com.google.common.base.Strings;
import com.kx.c;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KdbPublisherThread
extends Thread {
    private static final Logger log = LoggerFactory.getLogger(KdbPublisherThread.class);
    private static final Long DEFAULT_THREAD_SLEEP_MS = 10L;
    private final Long threadSleepMs;
    private KdbPublisher publisher;
    private ConcurrentLinkedQueue<KdbTable> publishBuffer;

    public KdbPublisherThread(KdbProcess server) throws KdbTargetProcessUnavailableException {
        this(server, DEFAULT_THREAD_SLEEP_MS);
    }

    public KdbPublisherThread(KdbProcess server, Long threadSleepMs) throws KdbTargetProcessUnavailableException {
        this(server, threadSleepMs, null);
    }

    public KdbPublisherThread(KdbProcess server, Long threadSleepMs, Duration resetConnectionDuration) throws KdbTargetProcessUnavailableException {
        this(new KdbPublisher(server, resetConnectionDuration), threadSleepMs);
    }

    public KdbPublisherThread(KdbPublisher publisher, Long threadSleepMs) {
        this.threadSleepMs = threadSleepMs;
        this.publisher = publisher;
        this.publishBuffer = new ConcurrentLinkedQueue();
        this.setName("KdbPublisher-" + publisher.getRemoteProcess().getHostname() + "-" + publisher.getRemoteProcess().getPort());
        this.start();
    }

    @Override
    public void run() {
        while (this.publisher.isConnected()) {
            boolean published;
            if (this.publishBuffer.isEmpty()) {
                try {
                    Thread.sleep(this.threadSleepMs);
                }
                catch (InterruptedException interruptedException) {}
                continue;
            }
            KdbTable toPublish = this.publishBuffer.peek();
            if (log.isDebugEnabled()) {
                log.debug("Publishing table update [ Table Name: {} ] [ Table Size: {} ] [ Queue Size: {} ]", new Object[]{toPublish.getTableName(), toPublish.getRowCount(), this.publishBuffer.size()});
            }
            if (!(published = this.publisher.publish(toPublish).booleanValue())) {
                log.warn("Kdb publishing failed. Will reattempt again. [ Table: {} ] [ Queue Size: {} ]", (Object)toPublish.getTableName(), (Object)this.publishBuffer.size());
                continue;
            }
            this.publishBuffer.remove();
        }
        log.error("KDB publisher thread has disconnected, thread will now exit [ KDB Process: {} ]", (Object)this.publisher);
        this.publishBuffer.clear();
        this.publishBuffer = null;
    }

    public void publish(List<KdbTable> tables) {
        if (tables == null || tables.isEmpty()) {
            return;
        }
        if (tables.contains(null)) {
            int beforeNullSize = tables.size();
            tables = tables.stream().filter(Objects::nonNull).collect(Collectors.toList());
            log.warn("One or more tables to be published are null and will not be published [ Before Size: {} ] [ After Size: {} ]", (Object)beforeNullSize, (Object)tables.size());
        }
        this.publishBuffer.addAll(tables);
    }

    public void publish(KdbTable table) {
        if (table == null) {
            return;
        }
        this.publishBuffer.add(table);
    }

    public void publish(String tableName, c.Flip tableData) {
        if (Strings.isNullOrEmpty((String)tableName) || tableData == null) {
            return;
        }
        this.publish(new KdbTable(tableName, tableData));
    }

    public synchronized void disconnect() {
        this.publisher.disconnect();
    }
}

