/*
 * Decompiled with CFR 0.152.
 */
package services;

import clients.SymBotClient;
import clients.symphony.api.FirehoseClient;
import exceptions.SymClientException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import listeners.FirehoseListener;
import model.DatafeedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import services.ActionFirehose;
import services.ActionFirehoseImpl;
import services.Sleeper;

public class FirehoseService {
    private final Logger logger = LoggerFactory.getLogger(FirehoseService.class);
    private SymBotClient botClient;
    private FirehoseClient firehoseClient;
    private List<FirehoseListener> listeners;
    private String firehoseId;
    private ExecutorService pool;
    private AtomicBoolean stop = new AtomicBoolean();
    private ActionFirehose action;

    public FirehoseService(SymBotClient client) {
        this(client, client.getFirehoseClient().createFirehose());
    }

    public FirehoseService(SymBotClient client, String firehoseId) {
        this(client, firehoseId, new ActionFirehoseImpl(client));
    }

    protected FirehoseService(SymBotClient client, String firehoseId, ActionFirehose actionFireHose) {
        this.botClient = client;
        this.listeners = new ArrayList<FirehoseListener>();
        this.firehoseClient = this.botClient.getFirehoseClient();
        this.firehoseId = firehoseId;
        this.action = actionFireHose;
        this.readFirehose();
        this.stop.set(false);
    }

    public void addListener(FirehoseListener listener) {
        this.listeners.add(listener);
    }

    public void removeListener(FirehoseListener listener) {
        this.listeners.remove(listener);
    }

    public void readFirehose() {
        if (this.pool != null) {
            this.pool.shutdown();
        }
        this.pool = Executors.newFixedThreadPool(5);
        Executors.newSingleThreadExecutor().submit(() -> {
            while (!this.stop.get()) {
                CompletableFuture<Object> future = this.getFirehoseHandleEventFuture(this.action, this.pool);
                try {
                    future.get();
                }
                catch (InterruptedException | ExecutionException e) {
                    this.logger.error("Error trying to read firehose ", (Throwable)e);
                }
            }
            return null;
        });
    }

    protected CompletableFuture<Object> getFirehoseHandleEventFuture(ActionFirehose action, Executor pool) {
        return ((CompletableFuture)CompletableFuture.supplyAsync(() -> action.actionReadFirehose(this.firehoseClient, this.firehoseId), pool).exceptionally(ex -> {
            this.handleError((Throwable)ex);
            return null;
        })).thenApply(events -> action.actionHandleEvents((List<DatafeedEvent>)events, this.listeners));
    }

    public void stopDatafeedService() {
        if (!this.stop.get()) {
            this.stop.set(true);
        }
    }

    public void restartDatafeedService() {
        if (this.stop.get()) {
            this.stop.set(false);
        }
        this.firehoseId = this.firehoseClient.createFirehose();
        this.readFirehose();
    }

    private void handleError(Throwable e) {
        Sleeper sleeper = new Sleeper();
        this.logger.error(e.getMessage());
        sleeper.sleep(30);
        try {
            this.firehoseId = this.firehoseClient.createFirehose();
        }
        catch (SymClientException e1) {
            sleeper.sleep(30);
            this.handleError(e);
        }
    }
}

