/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.tracer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.nio.charset.StandardCharsets;
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.fate.zookeeper.ZooReader;
import org.apache.accumulo.tracer.SendSpansViaThrift;
import org.apache.htrace.HTraceConfiguration;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooTraceClient
extends SendSpansViaThrift
implements Watcher {
    private static final Logger log = LoggerFactory.getLogger(ZooTraceClient.class);
    private static final int DEFAULT_TIMEOUT = 30000;
    ZooReader zoo = null;
    String path;
    boolean pathExists = false;
    final Random random = new Random();
    final List<String> hosts = new ArrayList<String>();
    long retryPause = 5000L;

    ZooTraceClient() {
    }

    public ZooTraceClient(HTraceConfiguration conf) {
        super(conf);
        String keepers = conf.get("tracer.zookeeper.host");
        if (keepers == null) {
            throw new IllegalArgumentException("Must configure tracer.zookeeper.host");
        }
        int timeout = conf.getInt("tracer.zookeeper.timeout", 30000);
        this.zoo = new ZooReader(keepers, timeout);
        this.path = conf.get("tracer.zookeeper.path", "/tracers");
        this.setInitialTraceHosts();
    }

    @VisibleForTesting
    protected void setRetryPause(long pause) {
        this.retryPause = pause;
    }

    @Override
    protected synchronized String getSpanKey(Map<String, String> data) {
        if (this.hosts.size() > 0) {
            String host = this.hosts.get(this.random.nextInt(this.hosts.size()));
            return host;
        }
        return null;
    }

    public void process(WatchedEvent event) {
        log.debug("Processing event for trace server zk watch");
        try {
            this.updateHostsFromZooKeeper();
        }
        catch (Exception ex) {
            log.error("unable to get destination hosts in zookeeper", (Throwable)ex);
        }
    }

    protected void setInitialTraceHosts() {
        final ScheduledExecutorService svc = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build());
        Runnable task = new Runnable(){

            @Override
            public void run() {
                try {
                    ZooTraceClient.this.updateHostsFromZooKeeper();
                    log.debug("Successfully initialized tracer hosts from ZooKeeper");
                    svc.shutdown();
                }
                catch (Exception e) {
                    log.error("Unabled to get destination tracer hosts in ZooKeeper, will retry in " + ZooTraceClient.this.retryPause + " milliseconds", (Throwable)e);
                    svc.schedule(this, ZooTraceClient.this.retryPause, TimeUnit.MILLISECONDS);
                }
            }
        };
        task.run();
    }

    protected void updateHostsFromZooKeeper() throws KeeperException, InterruptedException {
        if (this.pathExists || this.zoo.exists(this.path)) {
            this.pathExists = true;
            this.updateHosts(this.path, this.zoo.getChildren(this.path, (Watcher)this));
        } else {
            this.zoo.exists(this.path, (Watcher)this);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void sendSpans() {
        if (this.hosts.isEmpty()) {
            if (!this.sendQueue.isEmpty()) {
                log.error("No hosts to send data to, dropping queued spans");
                AbstractQueue abstractQueue = this.sendQueue;
                synchronized (abstractQueue) {
                    this.sendQueue.clear();
                    this.sendQueue.notifyAll();
                }
            }
        } else {
            super.sendSpans();
        }
    }

    private synchronized void updateHosts(String path, List<String> children) {
        log.debug("Scanning trace hosts in zookeeper: " + path);
        try {
            ArrayList<String> hosts = new ArrayList<String>();
            for (String child : children) {
                byte[] data = this.zoo.getData(path + "/" + child, null);
                hosts.add(new String(data, StandardCharsets.UTF_8));
            }
            this.hosts.clear();
            this.hosts.addAll(hosts);
            log.debug("Trace hosts: " + this.hosts);
        }
        catch (Exception ex) {
            log.error("unable to get destination hosts in zookeeper", (Throwable)ex);
        }
    }
}

