/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.backgroundtasks.taskstatestorage;

import ai.grakn.engine.TaskStatus;
import ai.grakn.engine.backgroundtasks.TaskState;
import ai.grakn.engine.backgroundtasks.TaskStateStorage;
import ai.grakn.engine.backgroundtasks.distributed.ZookeeperConnection;
import ai.grakn.exception.EngineStorageException;
import java.io.Serializable;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang.SerializationUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskStateZookeeperStore
implements TaskStateStorage {
    private static final String ZK_TASK_PATH = "/tasks/%s/state";
    private final Logger LOG = LoggerFactory.getLogger(TaskStateZookeeperStore.class);
    private final CuratorFramework zookeeperConnection;

    public TaskStateZookeeperStore(ZookeeperConnection connection) {
        this.zookeeperConnection = connection.connection();
    }

    @Override
    public String newState(TaskState task) {
        InterProcessMutex mutex = this.mutex(task.getId());
        this.acquire(mutex);
        try {
            this.zookeeperConnection.create().creatingParentContainersIfNeeded().forPath(String.format(ZK_TASK_PATH, task.getId()), SerializationUtils.serialize((Serializable)task));
        }
        catch (Exception exception) {
            throw new EngineStorageException("Could not write state to storage " + ExceptionUtils.getFullStackTrace((Throwable)exception));
        }
        finally {
            this.release(mutex);
        }
        return task.getId();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Boolean updateState(TaskState task) {
        InterProcessMutex mutex = this.mutex(task.getId());
        this.acquire(mutex);
        try {
            this.zookeeperConnection.setData().forPath(String.format(ZK_TASK_PATH, task.getId()), SerializationUtils.serialize((Serializable)task));
        }
        catch (Exception e) {
            this.LOG.error("Could not write to ZooKeeper! - " + e);
            Boolean bl = false;
            return bl;
        }
        finally {
            this.release(mutex);
        }
        return true;
    }

    @Override
    public TaskState getState(String id) {
        InterProcessMutex mutex = this.mutex(id);
        try {
            mutex.acquire();
            byte[] b = (byte[])this.zookeeperConnection.getData().forPath("/tasks/" + id + "/state");
            TaskState taskState = (TaskState)SerializationUtils.deserialize((byte[])b);
            return taskState;
        }
        catch (Exception e) {
            throw new EngineStorageException("Could not get state from storage " + ExceptionUtils.getFullStackTrace((Throwable)e));
        }
        finally {
            this.release(mutex);
        }
    }

    @Override
    public Set<TaskState> getTasks(TaskStatus taskStatus, String taskClassName, String createdBy, int limit, int offset) {
        try {
            Stream<TaskState> stream = ((List)this.zookeeperConnection.getChildren().forPath("/tasks")).stream().map(this::getState);
            if (taskStatus != null) {
                stream = stream.filter(t -> t.status().equals((Object)taskStatus));
            }
            if (taskClassName != null) {
                stream = stream.filter(t -> t.taskClassName().equals(taskClassName));
            }
            if (createdBy != null) {
                stream = stream.filter(t -> t.creator().equals(createdBy));
            }
            stream = stream.skip(offset);
            if (limit > 0) {
                stream = stream.limit(limit);
            }
            return stream.collect(Collectors.toSet());
        }
        catch (Exception e) {
            throw new EngineStorageException("Could not get state from storage " + ExceptionUtils.getFullStackTrace((Throwable)e));
        }
    }

    private InterProcessMutex mutex(String id) {
        return new InterProcessMutex(this.zookeeperConnection, "/tasks" + id + "/lock");
    }

    private void acquire(InterProcessMutex mutex) {
        try {
            mutex.acquire();
        }
        catch (Exception e) {
            throw new EngineStorageException("Error acquiring mutex from zookeeper.");
        }
    }

    private void release(InterProcessMutex mutex) {
        try {
            mutex.release();
        }
        catch (Exception e) {
            throw new EngineStorageException("Error releasing mutex from zookeeper.");
        }
    }
}

