/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram;

import com.datatorrent.api.StreamCodec;
import com.datatorrent.common.codec.JsonStreamCodec;
import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.EventRecorder;
import com.datatorrent.stram.api.StramEvent;
import com.datatorrent.stram.client.EventsAgent;
import com.datatorrent.stram.util.FSPartFileCollection;
import com.datatorrent.stram.util.SharedPubSubWebSocketClient;
import com.google.common.base.Throwables;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import net.engio.mbassy.listener.Handler;
import org.apache.commons.beanutils.BeanUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FSEventRecorder
implements EventRecorder {
    public static final String VERSION = "1.0";
    private final BlockingQueue<StramEvent> queue = new LinkedBlockingQueue<StramEvent>();
    private static final Logger LOG = LoggerFactory.getLogger(FSEventRecorder.class);
    private FSPartFileCollection storage;
    private String basePath = ".";
    private transient StreamCodec<Object> streamCodec;
    private final URI pubSubUrl = null;
    private int numSubscribers = 0;
    private SharedPubSubWebSocketClient wsClient;
    private final String pubSubTopic;
    private final EventRecorderThread eventRecorderThread = new EventRecorderThread();

    public FSEventRecorder(String appid) {
        LOG.debug("Event recorder created for {}", (Object)appid);
        this.pubSubTopic = "applications." + appid + ".events";
    }

    public void setWebSocketClient(SharedPubSubWebSocketClient wsClient) {
        this.wsClient = wsClient;
    }

    public void setBasePath(String basePath) {
        this.basePath = basePath;
    }

    public void setup() {
        try {
            this.streamCodec = new JsonStreamCodec();
            this.storage = new FSPartFileCollection();
            this.storage.setBasePath(this.basePath);
            this.storage.setup();
            this.storage.writeMetaData("1.0\n".getBytes());
            if (this.wsClient != null) {
                try {
                    this.setupWsClient();
                }
                catch (IOException | InterruptedException | ExecutionException | TimeoutException ex) {
                    LOG.error("Cannot connect to gateway at {}", (Object)this.pubSubUrl);
                }
            }
            this.eventRecorderThread.start();
        }
        catch (Exception ex) {
            throw Throwables.propagate((Throwable)ex);
        }
    }

    public void teardown() {
        this.eventRecorderThread.interrupt();
        try {
            this.eventRecorderThread.join();
        }
        catch (InterruptedException ex) {
            LOG.warn("Event recorder thread join interrupted");
        }
        if (this.storage != null) {
            this.storage.teardown();
        }
    }

    @Override
    @Handler
    public void recordEventAsync(StramEvent event) {
        LOG.debug("Adding event {} to the queue", (Object)event.getType());
        this.queue.add(event);
    }

    public void writeEvent(StramEvent event) throws Exception {
        LOG.debug("Writing event {} to the storage", (Object)event.getType());
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        bos.write((event.getTimestamp() + ":").getBytes());
        bos.write((event.getType() + ":").getBytes());
        Map data = BeanUtils.describe((Object)event);
        data.remove("timestamp");
        data.remove("class");
        data.remove("type");
        Slice f = this.streamCodec.toByteArray((Object)data);
        bos.write(f.buffer, f.offset, f.length);
        bos.write("\n".getBytes());
        this.storage.writeDataItem(bos.toByteArray(), true);
        if (this.numSubscribers > 0) {
            LOG.debug("Publishing event {} through websocket to gateway", (Object)event.getType());
            EventsAgent.EventInfo eventInfo = new EventsAgent.EventInfo();
            eventInfo.id = event.getId();
            eventInfo.timestamp = event.getTimestamp();
            eventInfo.type = event.getType();
            eventInfo.data = data;
            eventInfo.data.remove("id");
            this.wsClient.publish(this.pubSubTopic, eventInfo);
        }
    }

    private void setupWsClient() throws ExecutionException, IOException, InterruptedException, TimeoutException {
        this.wsClient.addHandler(this.pubSubTopic, true, new SharedPubSubWebSocketClient.Handler(){

            @Override
            public void onMessage(String type, String topic, Object data) {
                FSEventRecorder.this.numSubscribers = Integer.valueOf((String)data);
                LOG.info("Number of subscribers is now {}", (Object)FSEventRecorder.this.numSubscribers);
            }

            @Override
            public void onClose() {
                FSEventRecorder.this.numSubscribers = 0;
            }
        });
    }

    public void requestSync() {
        this.storage.requestSync();
    }

    private class EventRecorderThread
    extends Thread {
        private EventRecorderThread() {
        }

        @Override
        public void run() {
            while (true) {
                try {
                    while (true) {
                        FSEventRecorder.this.writeEvent((StramEvent)FSEventRecorder.this.queue.take());
                        EventRecorderThread.yield();
                        if (!FSEventRecorder.this.queue.isEmpty() || FSEventRecorder.this.storage.flushData() || FSEventRecorder.this.wsClient == null) continue;
                        String topic = "_internal.lastIndex.event." + FSEventRecorder.this.storage.getBasePath();
                        FSEventRecorder.this.wsClient.publish(topic, FSEventRecorder.this.storage.getLatestIndexLine());
                    }
                }
                catch (InterruptedException ex) {
                    return;
                }
                catch (Exception ex) {
                    LOG.error("Caught Exception", (Throwable)ex);
                    continue;
                }
                break;
            }
        }
    }
}

