/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.tracer;

import com.google.common.base.Charsets;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.ServerSocketChannel;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.AgeOffFilter;
import org.apache.accumulo.core.trace.TraceFormatter;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.ServerOpts;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
import org.apache.accumulo.trace.thrift.RemoteSpan;
import org.apache.accumulo.trace.thrift.SpanReceiver;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.thrift.TByteArrayOutputStream;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

public class TraceServer
implements Watcher {
    private static final Logger log = Logger.getLogger(TraceServer.class);
    private final ServerConfiguration serverConfiguration;
    private final TServer server;
    private final AtomicReference<BatchWriter> writer;
    private final Connector connector;
    final String table;

    private static void put(Mutation m, String cf, String cq, byte[] bytes, int len) {
        m.put(new Text(cf), new Text(cq), new Value(bytes, 0, len));
    }

    public TraceServer(ServerConfiguration serverConfiguration, String hostname) throws Exception {
        this.serverConfiguration = serverConfiguration;
        log.info((Object)"Version 1.6.5");
        log.info((Object)("Instance " + serverConfiguration.getInstance().getInstanceID()));
        AccumuloConfiguration conf = serverConfiguration.getConfiguration();
        this.table = conf.get(Property.TRACE_TABLE);
        Connector connector = null;
        while (true) {
            try {
                PasswordToken at;
                String principal = conf.get(Property.TRACE_USER);
                Map loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX);
                if (loginMap.isEmpty()) {
                    Property p = Property.TRACE_PASSWORD;
                    at = new PasswordToken(conf.get(p).getBytes(Charsets.UTF_8));
                } else {
                    AuthenticationToken.Properties props = new AuthenticationToken.Properties();
                    AuthenticationToken token = AccumuloVFSClassLoader.getClassLoader().loadClass(conf.get(Property.TRACE_TOKEN_TYPE)).asSubclass(AuthenticationToken.class).newInstance();
                    int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length();
                    for (Map.Entry entry : loginMap.entrySet()) {
                        props.put(((String)entry.getKey()).substring(prefixLength), (CharSequence)entry.getValue());
                    }
                    token.init(props);
                    at = token;
                }
                connector = serverConfiguration.getInstance().getConnector(principal, (AuthenticationToken)at);
                if (!connector.tableOperations().exists(this.table)) {
                    connector.tableOperations().create(this.table);
                    IteratorSetting setting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName());
                    AgeOffFilter.setTTL((IteratorSetting)setting, (Long)604800000L);
                    connector.tableOperations().attachIterator(this.table, setting);
                }
                connector.tableOperations().setProperty(this.table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
            }
            catch (Exception ex) {
                log.info((Object)"Waiting to checking/create the trace table.", (Throwable)ex);
                UtilWaitThread.sleep((long)1000L);
                continue;
            }
            break;
        }
        this.connector = connector;
        connector = null;
        int port = conf.getPort(Property.TRACE_PORT);
        ServerSocket sock = ServerSocketChannel.open().socket();
        sock.setReuseAddress(true);
        sock.bind(new InetSocketAddress(hostname, port));
        TServerSocket transport = new TServerSocket(sock);
        TThreadPoolServer.Args options = new TThreadPoolServer.Args((TServerTransport)transport);
        options.processor((TProcessor)new SpanReceiver.Processor((SpanReceiver.Iface)new Receiver()));
        this.server = new TThreadPoolServer(options);
        this.registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort());
        this.writer = new AtomicReference<BatchWriter>(this.connector.createBatchWriter(this.table, new BatchWriterConfig().setMaxLatency(5L, TimeUnit.SECONDS)));
    }

    public void run() throws Exception {
        SimpleTimer.getInstance().schedule(new Runnable(){

            @Override
            public void run() {
                TraceServer.this.flush();
            }
        }, 1000L, 1000L);
        this.server.serve();
    }

    private void flush() {
        try {
            BatchWriter writer = this.writer.get();
            if (null != writer) {
                writer.flush();
            } else if (this.connector.tableOperations().exists(this.table)) {
                this.resetWriter();
            }
        }
        catch (MutationsRejectedException exception) {
            log.warn((Object)("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + (Object)((Object)exception)));
            log.debug((Object)"flushing traces failed due to exception", (Throwable)exception);
            this.resetWriter();
        }
        catch (RuntimeException exception) {
            log.warn((Object)("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception));
            log.debug((Object)"flushing traces failed due to exception", (Throwable)exception);
            this.resetWriter();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    private void resetWriter() {
        block13: {
            BatchWriter writer = null;
            writer = this.connector.createBatchWriter(this.table, new BatchWriterConfig().setMaxLatency(5L, TimeUnit.SECONDS));
            writer = this.writer.getAndSet(writer);
            try {
                if (null != writer) {
                    writer.close();
                }
                break block13;
            }
            catch (Exception ex) {
                log.warn((Object)("Problem closing batch writer. Set log level to DEBUG to see stacktrace. cause: " + ex));
                log.debug((Object)"batch writer close failed with exception", (Throwable)ex);
            }
            break block13;
            catch (Exception ex) {
                try {
                    log.warn((Object)("Unable to create a batch writer, will retry. Set log level to DEBUG to see stacktrace. cause: " + ex));
                    log.debug((Object)"batch writer creation failed with exception.", (Throwable)ex);
                    writer = this.writer.getAndSet(writer);
                }
                catch (Throwable throwable) {
                    writer = this.writer.getAndSet(writer);
                    try {
                        if (null != writer) {
                            writer.close();
                        }
                    }
                    catch (Exception ex2) {
                        log.warn((Object)("Problem closing batch writer. Set log level to DEBUG to see stacktrace. cause: " + ex2));
                        log.debug((Object)"batch writer close failed with exception", (Throwable)ex2);
                    }
                    throw throwable;
                }
                try {
                    if (null != writer) {
                        writer.close();
                    }
                }
                catch (Exception ex3) {
                    log.warn((Object)("Problem closing batch writer. Set log level to DEBUG to see stacktrace. cause: " + ex3));
                    log.debug((Object)"batch writer close failed with exception", (Throwable)ex3);
                }
            }
        }
    }

    private void registerInZooKeeper(String name) throws Exception {
        String root = ZooUtil.getRoot((Instance)this.serverConfiguration.getInstance()) + "/tracers";
        ZooReaderWriter zoo = ZooReaderWriter.getInstance();
        String path = zoo.putEphemeralSequential(root + "/trace-", name.getBytes(Charsets.UTF_8));
        zoo.exists(path, (Watcher)this);
    }

    public static void main(String[] args) throws Exception {
        SecurityUtil.serverLogin((AccumuloConfiguration)ServerConfiguration.getSiteConfiguration());
        ServerOpts opts = new ServerOpts();
        String app = "tracer";
        opts.parseArgs("tracer", args, new Object[0]);
        Accumulo.setupLogging((String)"tracer");
        Instance instance = HdfsZooInstance.getInstance();
        ServerConfiguration conf = new ServerConfiguration(instance);
        VolumeManager fs = VolumeManagerImpl.get();
        Accumulo.init((VolumeManager)fs, (ServerConfiguration)conf, (String)"tracer");
        String hostname = opts.getAddress();
        TraceServer server = new TraceServer(conf, hostname);
        Accumulo.enableTracing((String)hostname, (String)"tracer");
        server.run();
        log.info((Object)"tracer stopping");
    }

    public void process(WatchedEvent event) {
        log.debug((Object)("event " + event.getPath() + " " + event.getType() + " " + event.getState()));
        if (event.getState() == Watcher.Event.KeeperState.Expired) {
            log.warn((Object)("Trace server lost zookeeper registration at " + event.getPath()));
            this.server.stop();
        } else if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
            log.warn((Object)("Trace server zookeeper entry lost " + event.getPath()));
            this.server.stop();
        }
        if (event.getPath() != null) {
            try {
                if (ZooReaderWriter.getInstance().exists(event.getPath(), (Watcher)this)) {
                    return;
                }
            }
            catch (Exception ex) {
                log.error((Object)ex, (Throwable)ex);
            }
            log.warn((Object)"Trace server unable to reset watch on zookeeper registration");
            this.server.stop();
        }
    }

    class Receiver
    implements SpanReceiver.Iface {
        Receiver() {
        }

        public void span(RemoteSpan s) throws TException {
            String idString = Long.toHexString(s.traceId);
            String startString = Long.toHexString(s.start);
            Mutation spanMutation = new Mutation(new Text(idString));
            Mutation indexMutation = new Mutation(new Text("idx:" + s.svc + ":" + startString));
            long diff = s.stop - s.start;
            indexMutation.put(new Text(s.description), new Text(s.sender), new Value((idString + ":" + Long.toHexString(diff)).getBytes(Charsets.UTF_8)));
            ByteArrayTransport transport = new ByteArrayTransport();
            TCompactProtocol protocol = new TCompactProtocol((TTransport)transport);
            s.write((TProtocol)protocol);
            String parentString = Long.toHexString(s.parentId);
            if (s.parentId == 0L) {
                parentString = "";
            }
            TraceServer.put(spanMutation, "span", parentString + ":" + Long.toHexString(s.spanId), transport.get(), transport.len());
            Mutation timeMutation = null;
            if (s.parentId == 0L) {
                timeMutation = new Mutation(new Text("start:" + startString));
                TraceServer.put(timeMutation, "id", idString, transport.get(), transport.len());
            }
            try {
                BatchWriter writer = (BatchWriter)TraceServer.this.writer.get();
                if (null == writer) {
                    log.warn((Object)"writer is not ready; discarding span.");
                    return;
                }
                writer.addMutation(spanMutation);
                writer.addMutation(indexMutation);
                if (timeMutation != null) {
                    writer.addMutation(timeMutation);
                }
            }
            catch (MutationsRejectedException exception) {
                log.warn((Object)("Unable to write mutation to table; discarding span. set log level to DEBUG for span information and stacktrace. cause: " + (Object)((Object)exception)));
                if (log.isDebugEnabled()) {
                    log.debug((Object)("discarded span due to rejection of mutation: " + spanMutation), (Throwable)exception);
                }
            }
            catch (RuntimeException exception) {
                log.warn((Object)("Unable to write mutation to table; discarding span. set log level to DEBUG for stacktrace. cause: " + exception));
                log.debug((Object)"unable to write mutation to table due to exception.", (Throwable)exception);
            }
        }
    }

    static class ByteArrayTransport
    extends TTransport {
        TByteArrayOutputStream out = new TByteArrayOutputStream();

        ByteArrayTransport() {
        }

        public boolean isOpen() {
            return true;
        }

        public void open() throws TTransportException {
        }

        public void close() {
        }

        public int read(byte[] buf, int off, int len) {
            return 0;
        }

        public void write(byte[] buf, int off, int len) throws TTransportException {
            this.out.write(buf, off, len);
        }

        public byte[] get() {
            return this.out.get();
        }

        public int len() {
            return this.out.len();
        }
    }
}

