/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializableImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StateStoreZooKeeperImpl
extends StateStoreSerializableImpl {
    private static final Logger LOG = LoggerFactory.getLogger(StateStoreZooKeeperImpl.class);
    public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX = "dfs.federation.router.store.driver.zk.";
    public static final String FEDERATION_STORE_ZK_PARENT_PATH = "dfs.federation.router.store.driver.zk.parent-path";
    public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT = "/hdfs-federation";
    private String baseZNode;
    private ZKCuratorManager zkManager;

    @Override
    public boolean initDriver() {
        LOG.info("Initializing ZooKeeper connection");
        Configuration conf = this.getConf();
        this.baseZNode = conf.get(FEDERATION_STORE_ZK_PARENT_PATH, FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
        try {
            this.zkManager = new ZKCuratorManager(conf);
            this.zkManager.start();
        }
        catch (IOException e) {
            LOG.error("Cannot initialize the ZK connection", (Throwable)e);
            return false;
        }
        return true;
    }

    @Override
    public <T extends BaseRecord> boolean initRecordStorage(String className, Class<T> clazz) {
        try {
            String checkPath = ZKCuratorManager.getNodePath((String)this.baseZNode, (String)className);
            this.zkManager.createRootDirRecursively(checkPath);
            return true;
        }
        catch (Exception e) {
            LOG.error("Cannot initialize ZK node for {}: {}", (Object)className, (Object)e.getMessage());
            return false;
        }
    }

    @Override
    public void close() throws Exception {
        if (this.zkManager != null) {
            this.zkManager.close();
        }
    }

    @Override
    public boolean isDriverReady() {
        if (this.zkManager == null) {
            return false;
        }
        CuratorFramework curator = this.zkManager.getCurator();
        if (curator == null) {
            return false;
        }
        return curator.getState() == CuratorFrameworkState.STARTED;
    }

    @Override
    public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException {
        this.verifyDriverReady();
        long start = Time.monotonicNow();
        ArrayList<T> ret = new ArrayList<T>();
        String znode = this.getZNodeForClass(clazz);
        try {
            List children = this.zkManager.getChildren(znode);
            for (String child : children) {
                try {
                    String path = ZKCuratorManager.getNodePath((String)znode, (String)child);
                    Stat stat = new Stat();
                    String data = this.zkManager.getStringData(path, stat);
                    boolean corrupted = false;
                    if (data == null || data.equals("")) {
                        corrupted = true;
                    } else {
                        try {
                            T record = this.createRecord(data, stat, clazz);
                            ret.add(record);
                        }
                        catch (IOException e) {
                            LOG.error("Cannot create record type \"{}\" from \"{}\": {}", new Object[]{clazz.getSimpleName(), data, e.getMessage()});
                            corrupted = true;
                        }
                    }
                    if (!corrupted) continue;
                    LOG.error("Cannot get data for {} at {}, cleaning corrupted data", (Object)child, (Object)path);
                    this.zkManager.delete(path);
                }
                catch (Exception e) {
                    LOG.error("Cannot get data for {}: {}", (Object)child, (Object)e.getMessage());
                }
            }
        }
        catch (Exception e) {
            this.getMetrics().addFailure(Time.monotonicNow() - start);
            String msg = "Cannot get children for \"" + znode + "\": " + e.getMessage();
            LOG.error(msg);
            throw new IOException(msg);
        }
        long end = Time.monotonicNow();
        this.getMetrics().addRead(end - start);
        return new QueryResult(ret, this.getTime());
    }

    @Override
    public <T extends BaseRecord> boolean putAll(List<T> records, boolean update, boolean error) throws IOException {
        this.verifyDriverReady();
        if (records.isEmpty()) {
            return true;
        }
        BaseRecord record0 = (BaseRecord)records.get(0);
        Class<?> recordClass = record0.getClass();
        String znode = this.getZNodeForClass(recordClass);
        long start = Time.monotonicNow();
        boolean status = true;
        for (BaseRecord record : records) {
            byte[] data;
            String primaryKey = StateStoreZooKeeperImpl.getPrimaryKey(record);
            String recordZNode = ZKCuratorManager.getNodePath((String)znode, (String)primaryKey);
            if (this.writeNode(recordZNode, data = this.serialize(record), update, error)) continue;
            status = false;
        }
        long end = Time.monotonicNow();
        if (status) {
            this.getMetrics().addWrite(end - start);
        } else {
            this.getMetrics().addFailure(end - start);
        }
        return status;
    }

    @Override
    public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query) throws IOException {
        this.verifyDriverReady();
        if (query == null) {
            return 0;
        }
        long start = Time.monotonicNow();
        List<T> records = null;
        try {
            QueryResult<T> result = this.get(clazz);
            records = result.getRecords();
        }
        catch (IOException ex) {
            LOG.error("Cannot get existing records", (Throwable)ex);
            this.getMetrics().addFailure(Time.monotonicNow() - start);
            return 0;
        }
        String znode = this.getZNodeForClass(clazz);
        List<BaseRecord> recordsToRemove = StateStoreUtils.filterMultiple(query, records);
        int removed = 0;
        for (BaseRecord existingRecord : recordsToRemove) {
            LOG.info("Removing \"{}\"", (Object)existingRecord);
            try {
                String primaryKey = StateStoreZooKeeperImpl.getPrimaryKey(existingRecord);
                String path = ZKCuratorManager.getNodePath((String)znode, (String)primaryKey);
                if (this.zkManager.delete(path)) {
                    ++removed;
                    continue;
                }
                LOG.error("Did not remove \"{}\"", (Object)existingRecord);
            }
            catch (Exception e) {
                LOG.error("Cannot remove \"{}\"", (Object)existingRecord, (Object)e);
                this.getMetrics().addFailure(Time.monotonicNow() - start);
            }
        }
        long end = Time.monotonicNow();
        if (removed > 0) {
            this.getMetrics().addRemove(end - start);
        }
        return removed;
    }

    @Override
    public <T extends BaseRecord> boolean removeAll(Class<T> clazz) throws IOException {
        long start = Time.monotonicNow();
        boolean status = true;
        String znode = this.getZNodeForClass(clazz);
        LOG.info("Deleting all children under {}", (Object)znode);
        try {
            List children = this.zkManager.getChildren(znode);
            for (String child : children) {
                String path = ZKCuratorManager.getNodePath((String)znode, (String)child);
                LOG.info("Deleting {}", (Object)path);
                this.zkManager.delete(path);
            }
        }
        catch (Exception e) {
            LOG.error("Cannot remove {}: {}", (Object)znode, (Object)e.getMessage());
            status = false;
        }
        long time = Time.monotonicNow() - start;
        if (status) {
            this.getMetrics().addRemove(time);
        } else {
            this.getMetrics().addFailure(time);
        }
        return status;
    }

    private boolean writeNode(String znode, byte[] bytes, boolean update, boolean error) {
        try {
            boolean created = this.zkManager.create(znode);
            if (!update && !created && error) {
                LOG.info("Cannot write record \"{}\", it already exists", (Object)znode);
                return false;
            }
            this.zkManager.setData(znode, bytes, -1);
            return true;
        }
        catch (Exception e) {
            LOG.error("Cannot write record \"{}\": {}", (Object)znode, (Object)e.getMessage());
            return false;
        }
    }

    private <T extends BaseRecord> String getZNodeForClass(Class<T> clazz) {
        String className = StateStoreUtils.getRecordName(clazz);
        return ZKCuratorManager.getNodePath((String)this.baseZNode, (String)className);
    }

    private <T extends BaseRecord> T createRecord(String data, Stat stat, Class<T> clazz) throws IOException {
        T record = this.newRecord(data, clazz, false);
        ((BaseRecord)record).setDateCreated(stat.getCtime());
        ((BaseRecord)record).setDateModified(stat.getMtime());
        return record;
    }
}

