/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.timeline;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.AnnotationIntrospector;
import com.fasterxml.jackson.databind.MappingJsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.MalformedURLException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.util.ApplicationClassLoader;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.timeline.DomainLogInfo;
import org.apache.hadoop.yarn.server.timeline.EntityCacheItem;
import org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStoreMetrics;
import org.apache.hadoop.yarn.server.timeline.EntityLogInfo;
import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
import org.apache.hadoop.yarn.server.timeline.LogInfo;
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.TimelineEntityGroupPlugin;
import org.apache.hadoop.yarn.server.timeline.TimelineReader;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.apache.hadoop.yarn.util.Apps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EntityGroupFSTimelineStore
extends CompositeService
implements TimelineStore {
    static final String DOMAIN_LOG_PREFIX = "domainlog-";
    static final String SUMMARY_LOG_PREFIX = "summarylog-";
    static final String ENTITY_LOG_PREFIX = "entitylog-";
    static final String ATS_V15_SERVER_DFS_CALLER_CTXT = "yarn_ats_server_v1_5";
    private static final Logger LOG = LoggerFactory.getLogger(EntityGroupFSTimelineStore.class);
    private static final FsPermission ACTIVE_DIR_PERMISSION = new FsPermission(1023);
    private static final FsPermission DONE_DIR_PERMISSION = new FsPermission(448);
    private static final String APP_DONE_DIR_PREFIX_FORMAT = "%d/%04d/%03d/%s/";
    private static final int CACHE_ITEM_OVERFLOW_FACTOR = 2;
    private YarnClient yarnClient;
    private TimelineStore summaryStore;
    private TimelineACLsManager aclManager;
    private TimelineDataManager summaryTdm;
    private ConcurrentMap<ApplicationId, AppLogs> appIdLogMap = new ConcurrentHashMap<ApplicationId, AppLogs>();
    private ScheduledThreadPoolExecutor executor;
    private AtomicBoolean stopExecutors = new AtomicBoolean(false);
    private FileSystem fs;
    private ObjectMapper objMapper;
    private JsonFactory jsonFactory;
    private Path activeRootPath;
    private Path doneRootPath;
    private long logRetainMillis;
    private long unknownActiveMillis;
    private int appCacheMaxSize = 0;
    private List<TimelineEntityGroupPlugin> cacheIdPlugins;
    private Map<TimelineEntityGroupId, EntityCacheItem> cachedLogs;
    @InterfaceAudience.Private
    @VisibleForTesting
    EntityGroupFSTimelineStoreMetrics metrics;

    public EntityGroupFSTimelineStore() {
        super(EntityGroupFSTimelineStore.class.getSimpleName());
    }

    protected void serviceInit(Configuration conf) throws Exception {
        this.metrics = EntityGroupFSTimelineStoreMetrics.create();
        this.summaryStore = this.createSummaryStore();
        this.addService((Service)this.summaryStore);
        long logRetainSecs = conf.getLong("yarn.timeline-service.entity-group-fs-store.retain-seconds", 604800L);
        this.logRetainMillis = logRetainSecs * 1000L;
        LOG.info("Cleaner set to delete logs older than {} seconds", (Object)logRetainSecs);
        long unknownActiveSecs = conf.getLong("yarn.timeline-service.entity-group-fs-store.unknown-active-seconds", 86400L);
        this.unknownActiveMillis = unknownActiveSecs * 1000L;
        LOG.info("Unknown apps will be treated as complete after {} seconds", (Object)unknownActiveSecs);
        this.appCacheMaxSize = conf.getInt("yarn.timeline-service.entity-group-fs-store.app-cache-size", 10);
        LOG.info("Application cache size is {}", (Object)this.appCacheMaxSize);
        this.cachedLogs = Collections.synchronizedMap(new LinkedHashMap<TimelineEntityGroupId, EntityCacheItem>(this.appCacheMaxSize + 1, 0.75f, true){

            @Override
            protected boolean removeEldestEntry(Map.Entry<TimelineEntityGroupId, EntityCacheItem> eldest) {
                if (super.size() > EntityGroupFSTimelineStore.this.appCacheMaxSize) {
                    TimelineEntityGroupId groupId = eldest.getKey();
                    LOG.debug("Evicting {} due to space limitations", (Object)groupId);
                    EntityCacheItem cacheItem = eldest.getValue();
                    LOG.debug("Force release cache {}.", (Object)groupId);
                    cacheItem.forceRelease();
                    if (cacheItem.getAppLogs().isDone()) {
                        EntityGroupFSTimelineStore.this.appIdLogMap.remove(groupId.getApplicationId());
                    }
                    EntityGroupFSTimelineStore.this.metrics.incrCacheEvicts();
                    return true;
                }
                return false;
            }
        });
        this.cacheIdPlugins = this.loadPlugIns(conf);
        this.yarnClient = this.createAndInitYarnClient(conf);
        this.addIfService(this.yarnClient);
        this.activeRootPath = new Path(conf.get("yarn.timeline-service.entity-group-fs-store.active-dir", "/tmp/entity-file-history/active"));
        this.doneRootPath = new Path(conf.get("yarn.timeline-service.entity-group-fs-store.done-dir", "/tmp/entity-file-history/done"));
        this.fs = this.activeRootPath.getFileSystem(conf);
        CallerContext.setCurrent((CallerContext)new CallerContext.Builder(ATS_V15_SERVER_DFS_CALLER_CTXT).build());
        super.serviceInit(conf);
    }

    private List<TimelineEntityGroupPlugin> loadPlugIns(Configuration conf) throws RuntimeException {
        Collection pluginNames = conf.getTrimmedStringCollection("yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes");
        String pluginClasspath = conf.getTrimmed("yarn.timeline-service.entity-group-fs-store.group-id-plugin-classpath");
        String[] systemClasses = conf.getTrimmedStrings("yarn.timeline-service.entity-group-fs-store.group-id-plugin-system-classes");
        LinkedList<TimelineEntityGroupPlugin> pluginList = new LinkedList<TimelineEntityGroupPlugin>();
        ClassLoader customClassLoader = null;
        if (pluginClasspath != null && pluginClasspath.length() > 0) {
            try {
                customClassLoader = EntityGroupFSTimelineStore.createPluginClassLoader(pluginClasspath, systemClasses);
            }
            catch (IOException ioe) {
                LOG.warn("Error loading classloader", (Throwable)ioe);
            }
        }
        for (String name : pluginNames) {
            LOG.debug("Trying to load plugin class {}", (Object)name);
            TimelineEntityGroupPlugin cacheIdPlugin = null;
            try {
                Class<?> clazz;
                if (customClassLoader != null) {
                    LOG.debug("Load plugin {} with classpath: {}", (Object)name, (Object)pluginClasspath);
                    clazz = Class.forName(name, true, customClassLoader);
                    Class<TimelineEntityGroupPlugin> sClass = clazz.asSubclass(TimelineEntityGroupPlugin.class);
                    cacheIdPlugin = (TimelineEntityGroupPlugin)ReflectionUtils.newInstance(sClass, (Configuration)conf);
                } else {
                    LOG.debug("Load plugin class with system classpath");
                    clazz = conf.getClassByName(name);
                    cacheIdPlugin = (TimelineEntityGroupPlugin)ReflectionUtils.newInstance(clazz, (Configuration)conf);
                }
            }
            catch (Exception e) {
                LOG.warn("Error loading plugin " + name, (Throwable)e);
                throw new RuntimeException("No class defined for " + name, e);
            }
            LOG.info("Load plugin class {}", (Object)cacheIdPlugin.getClass().getName());
            pluginList.add(cacheIdPlugin);
        }
        return pluginList;
    }

    private TimelineStore createSummaryStore() {
        return (TimelineStore)ReflectionUtils.newInstance((Class)this.getConfig().getClass("yarn.timeline-service.entity-group-fs-store.summary-store", LeveldbTimelineStore.class, TimelineStore.class), (Configuration)this.getConfig());
    }

    protected void serviceStart() throws Exception {
        super.serviceStart();
        LOG.info("Starting {}", (Object)this.getName());
        this.summaryStore.start();
        Configuration conf = this.getConfig();
        this.aclManager = new TimelineACLsManager(conf);
        this.aclManager.setTimelineStore(this.summaryStore);
        this.summaryTdm = new TimelineDataManager(this.summaryStore, this.aclManager);
        this.summaryTdm.init(conf);
        this.addService((Service)this.summaryTdm);
        super.serviceStart();
        if (!this.fs.exists(this.activeRootPath)) {
            this.fs.mkdirs(this.activeRootPath);
            this.fs.setPermission(this.activeRootPath, ACTIVE_DIR_PERMISSION);
        }
        if (!this.fs.exists(this.doneRootPath)) {
            this.fs.mkdirs(this.doneRootPath);
            this.fs.setPermission(this.doneRootPath, DONE_DIR_PERMISSION);
        }
        this.objMapper = new ObjectMapper();
        this.objMapper.setAnnotationIntrospector((AnnotationIntrospector)new JaxbAnnotationIntrospector(TypeFactory.defaultInstance()));
        this.jsonFactory = new MappingJsonFactory(this.objMapper);
        long scanIntervalSecs = conf.getLong("yarn.timeline-service.entity-group-fs-store.scan-interval-seconds", 60L);
        long cleanerIntervalSecs = conf.getLong("yarn.timeline-service.entity-group-fs-store.cleaner-interval-seconds", 3600L);
        int numThreads = conf.getInt("yarn.timeline-service.entity-group-fs-store.threads", 16);
        LOG.info("Scanning active directory {} every {} seconds", (Object)this.activeRootPath, (Object)scanIntervalSecs);
        LOG.info("Cleaning logs every {} seconds", (Object)cleanerIntervalSecs);
        this.executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactoryBuilder().setNameFormat("EntityLogPluginWorker #%d").build());
        this.executor.scheduleAtFixedRate(new EntityLogScanner(), 0L, scanIntervalSecs, TimeUnit.SECONDS);
        this.executor.scheduleAtFixedRate(new EntityLogCleaner(), cleanerIntervalSecs, cleanerIntervalSecs, TimeUnit.SECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void serviceStop() throws Exception {
        LOG.info("Stopping {}", (Object)this.getName());
        this.stopExecutors.set(true);
        if (this.executor != null) {
            this.executor.shutdown();
            if (this.executor.isTerminating()) {
                LOG.info("Waiting for executor to terminate");
                boolean terminated = this.executor.awaitTermination(10L, TimeUnit.SECONDS);
                if (terminated) {
                    LOG.info("Executor terminated");
                } else {
                    LOG.warn("Executor did not terminate");
                    this.executor.shutdownNow();
                }
            }
        }
        Map<TimelineEntityGroupId, EntityCacheItem> map = this.cachedLogs;
        synchronized (map) {
            for (EntityCacheItem cacheItem : this.cachedLogs.values()) {
                ServiceOperations.stopQuietly((Service)cacheItem.getStore());
            }
        }
        CallerContext.setCurrent(null);
        super.serviceStop();
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    int scanActiveLogs() throws IOException {
        long startTime = Time.monotonicNow();
        int logsToScanCount = this.scanActiveLogs(this.activeRootPath);
        this.metrics.addActiveLogDirScanTime(Time.monotonicNow() - startTime);
        return logsToScanCount;
    }

    int scanActiveLogs(Path dir) throws IOException {
        RemoteIterator<FileStatus> iter = this.list(dir);
        int logsToScanCount = 0;
        while (iter.hasNext()) {
            FileStatus stat = (FileStatus)iter.next();
            String name = stat.getPath().getName();
            ApplicationId appId = EntityGroupFSTimelineStore.parseApplicationId(name);
            if (appId != null) {
                LOG.debug("scan logs for {} in {}", (Object)appId, (Object)stat.getPath());
                ++logsToScanCount;
                AppLogs logs = this.getAndSetActiveLog(appId, stat.getPath());
                this.executor.execute(new ActiveLogParser(logs));
                continue;
            }
            logsToScanCount += this.scanActiveLogs(stat.getPath());
        }
        return logsToScanCount;
    }

    private RemoteIterator<FileStatus> list(Path path) throws IOException {
        return new StoppableRemoteIterator((RemoteIterator<FileStatus>)this.fs.listStatusIterator(path));
    }

    private AppLogs createAndPutAppLogsIfAbsent(ApplicationId appId, Path appDirPath, AppState appState) {
        AppLogs appLogs = new AppLogs(appId, appDirPath, appState);
        AppLogs oldAppLogs = this.appIdLogMap.putIfAbsent(appId, appLogs);
        if (oldAppLogs != null) {
            appLogs = oldAppLogs;
        }
        return appLogs;
    }

    private AppLogs getAndSetActiveLog(ApplicationId appId, Path appDirPath) {
        AppLogs appLogs = (AppLogs)this.appIdLogMap.get(appId);
        if (appLogs == null) {
            appLogs = this.createAndPutAppLogsIfAbsent(appId, appDirPath, AppState.ACTIVE);
        }
        return appLogs;
    }

    private AppLogs getAndSetAppLogs(ApplicationId applicationId) throws IOException {
        LOG.debug("Looking for app logs mapped for app id {}", (Object)applicationId);
        AppLogs appLogs = (AppLogs)this.appIdLogMap.get(applicationId);
        if (appLogs == null) {
            AppState appState = AppState.UNKNOWN;
            Path appDirPath = this.getDoneAppPath(applicationId);
            if (this.fs.exists(appDirPath)) {
                appState = AppState.COMPLETED;
            } else {
                appDirPath = this.getActiveAppPath(applicationId);
                if (this.fs.exists(appDirPath)) {
                    appState = AppState.ACTIVE;
                } else {
                    RemoteIterator<FileStatus> iter = this.list(this.activeRootPath);
                    while (iter.hasNext()) {
                        Path child = new Path(((FileStatus)iter.next()).getPath().getName(), applicationId.toString());
                        appDirPath = new Path(this.activeRootPath, child);
                        if (!this.fs.exists(appDirPath)) continue;
                        appState = AppState.ACTIVE;
                        break;
                    }
                }
            }
            if (appState != AppState.UNKNOWN) {
                LOG.debug("Create and try to add new appLogs to appIdLogMap for {}", (Object)applicationId);
                appLogs = this.createAndPutAppLogsIfAbsent(applicationId, appDirPath, appState);
            }
        }
        return appLogs;
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    void cleanLogs(Path dirpath, FileSystem fs, long retainMillis) throws IOException {
        long now = Time.now();
        RemoteIterator<FileStatus> iter = this.list(dirpath);
        while (iter.hasNext()) {
            FileStatus stat = (FileStatus)iter.next();
            if (!stat.isDirectory()) continue;
            ApplicationId appId = EntityGroupFSTimelineStore.parseApplicationId(dirpath.getName());
            if (appId != null) {
                if (!EntityGroupFSTimelineStore.shouldCleanAppLogDir(dirpath, now, fs, retainMillis)) continue;
                try {
                    LOG.info("Deleting {}", (Object)dirpath);
                    if (!fs.delete(dirpath, true)) {
                        LOG.error("Unable to remove " + dirpath);
                    }
                    this.metrics.incrLogsDirsCleaned();
                }
                catch (IOException e) {
                    LOG.error("Unable to remove " + dirpath, (Throwable)e);
                }
                continue;
            }
            this.cleanLogs(stat.getPath(), fs, retainMillis);
        }
    }

    private static boolean shouldCleanAppLogDir(Path appLogPath, long now, FileSystem fs, long logRetainMillis) throws IOException {
        RemoteIterator iter = fs.listStatusIterator(appLogPath);
        while (iter.hasNext()) {
            FileStatus stat = (FileStatus)iter.next();
            if (now - stat.getModificationTime() <= logRetainMillis) {
                LOG.debug("{} not being cleaned due to {}", (Object)appLogPath, (Object)stat.getPath());
                return false;
            }
            if (!stat.isDirectory() || EntityGroupFSTimelineStore.shouldCleanAppLogDir(stat.getPath(), now, fs, logRetainMillis)) continue;
            return false;
        }
        return true;
    }

    private static ApplicationId parseApplicationId(String appIdStr) {
        ApplicationId appId = null;
        if (appIdStr.startsWith("application")) {
            try {
                appId = ApplicationId.fromString((String)appIdStr);
            }
            catch (IllegalArgumentException e) {
                appId = null;
            }
        }
        return appId;
    }

    private static ClassLoader createPluginClassLoader(final String appClasspath, final String[] systemClasses) throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<ClassLoader>(){

                @Override
                public ClassLoader run() throws MalformedURLException {
                    return new ApplicationClassLoader(appClasspath, EntityGroupFSTimelineStore.class.getClassLoader(), Arrays.asList(systemClasses));
                }
            });
        }
        catch (PrivilegedActionException e) {
            Throwable t = e.getCause();
            if (t instanceof MalformedURLException) {
                throw (MalformedURLException)t;
            }
            throw new IOException(e);
        }
    }

    private Path getActiveAppPath(ApplicationId appId) {
        return new Path(this.activeRootPath, appId.toString());
    }

    private Path getDoneAppPath(ApplicationId appId) {
        int appNum = appId.getId();
        int bucket2 = (appNum /= 1000) % 1000;
        int bucket1 = appNum / 1000;
        return new Path(this.doneRootPath, String.format(APP_DONE_DIR_PREFIX_FORMAT, appId.getClusterTimestamp(), bucket1, bucket2, appId.toString()));
    }

    @VisibleForTesting
    protected YarnClient createAndInitYarnClient(Configuration conf) {
        YarnClient client = YarnClient.createYarnClient();
        client.init(conf);
        return client;
    }

    @VisibleForTesting
    protected AppState getAppState(ApplicationId appId) throws IOException {
        return EntityGroupFSTimelineStore.getAppState(appId, this.yarnClient);
    }

    @VisibleForTesting
    List<TimelineEntityGroupPlugin> getPlugins() {
        return this.cacheIdPlugins;
    }

    private static synchronized AppState getAppState(ApplicationId appId, YarnClient yarnClient) throws IOException {
        AppState appState = AppState.ACTIVE;
        try {
            ApplicationReport report = yarnClient.getApplicationReport(appId);
            if (Apps.isApplicationFinalState((YarnApplicationState)report.getYarnApplicationState())) {
                appState = AppState.COMPLETED;
            }
        }
        catch (ApplicationNotFoundException e) {
            appState = AppState.UNKNOWN;
        }
        catch (YarnException e) {
            throw new IOException(e);
        }
        return appState;
    }

    private Throwable extract(Exception e) {
        Throwable t = e;
        if (e instanceof UndeclaredThrowableException && e.getCause() != null) {
            t = e.getCause();
        }
        return t;
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    void setFs(FileSystem incomingFs) {
        this.fs = incomingFs;
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) {
        this.cachedLogs.put(groupId, cacheItem);
    }

    private List<TimelineStore> getTimelineStoresFromCacheIds(Set<TimelineEntityGroupId> groupIds, String entityType, List<EntityCacheItem> cacheItems) throws IOException {
        LinkedList<TimelineStore> stores = new LinkedList<TimelineStore>();
        for (TimelineEntityGroupId groupId : groupIds) {
            TimelineStore storeForId = this.getCachedStore(groupId, cacheItems);
            if (storeForId == null) continue;
            LOG.debug("Adding {} as a store for the query", (Object)storeForId.getName());
            stores.add(storeForId);
            this.metrics.incrGetEntityToDetailOps();
        }
        if (stores.size() == 0) {
            LOG.debug("Using summary store for {}", (Object)entityType);
            stores.add(this.summaryStore);
            this.metrics.incrGetEntityToSummaryOps();
        }
        return stores;
    }

    protected List<TimelineStore> getTimelineStoresForRead(String entityId, String entityType, List<EntityCacheItem> cacheItems) throws IOException {
        HashSet<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
        for (TimelineEntityGroupPlugin cacheIdPlugin : this.cacheIdPlugins) {
            LOG.debug("Trying plugin {} for id {} and type {}", new Object[]{cacheIdPlugin.getClass().getName(), entityId, entityType});
            Set<TimelineEntityGroupId> idsFromPlugin = cacheIdPlugin.getTimelineEntityGroupId(entityId, entityType);
            if (idsFromPlugin == null) {
                LOG.debug("Plugin returned null " + cacheIdPlugin.getClass().getName());
            } else {
                LOG.debug("Plugin returned ids: " + idsFromPlugin);
            }
            if (idsFromPlugin == null) continue;
            groupIds.addAll(idsFromPlugin);
            LOG.debug("plugin {} returns a non-null value on query", (Object)cacheIdPlugin.getClass().getName());
        }
        return this.getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems);
    }

    private List<TimelineStore> getTimelineStoresForRead(String entityType, NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters, List<EntityCacheItem> cacheItems) throws IOException {
        HashSet<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
        for (TimelineEntityGroupPlugin cacheIdPlugin : this.cacheIdPlugins) {
            Set<TimelineEntityGroupId> idsFromPlugin = cacheIdPlugin.getTimelineEntityGroupId(entityType, primaryFilter, secondaryFilters);
            if (idsFromPlugin == null) continue;
            LOG.debug("plugin {} returns a non-null value on query {}", (Object)cacheIdPlugin.getClass().getName(), idsFromPlugin);
            groupIds.addAll(idsFromPlugin);
        }
        return this.getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TimelineStore getCachedStore(TimelineEntityGroupId groupId, List<EntityCacheItem> cacheItems) throws IOException {
        AppLogs appLogs;
        EntityCacheItem cacheItem;
        Map<TimelineEntityGroupId, EntityCacheItem> map = this.cachedLogs;
        synchronized (map) {
            cacheItem = this.cachedLogs.get(groupId);
            if (cacheItem == null) {
                LOG.debug("Set up new cache item for id {}", (Object)groupId);
                cacheItem = new EntityCacheItem(groupId, this.getConfig());
                appLogs = this.getAndSetAppLogs(groupId.getApplicationId());
                if (appLogs != null) {
                    LOG.debug("Set applogs {} for group id {}", (Object)appLogs, (Object)groupId);
                    cacheItem.setAppLogs(appLogs);
                    this.cachedLogs.put(groupId, cacheItem);
                } else {
                    LOG.warn("AppLogs for groupId {} is set to null!", (Object)groupId);
                }
            }
        }
        TimelineStore store = null;
        if (cacheItem.getAppLogs() != null) {
            appLogs = cacheItem.getAppLogs();
            LOG.debug("try refresh cache {} {}", (Object)groupId, (Object)appLogs.getAppId());
            cacheItems.add(cacheItem);
            store = cacheItem.refreshCache(this.aclManager, this.metrics);
        } else {
            LOG.warn("AppLogs for group id {} is null", (Object)groupId);
        }
        return store;
    }

    public TimelineEntities getEntities(String entityType, Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs, NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters, EnumSet<TimelineReader.Field> fieldsToRetrieve, TimelineDataManager.CheckAcl checkAcl) throws IOException {
        LOG.debug("getEntities type={} primary={}", (Object)entityType, (Object)primaryFilter);
        ArrayList<EntityCacheItem> relatedCacheItems = new ArrayList<EntityCacheItem>();
        List<TimelineStore> stores = this.getTimelineStoresForRead(entityType, primaryFilter, secondaryFilters, relatedCacheItems);
        TimelineEntities returnEntities = new TimelineEntities();
        for (TimelineStore store : stores) {
            LOG.debug("Try timeline store {} for the request", (Object)store.getName());
            TimelineEntities entities = store.getEntities(entityType, limit, windowStart, windowEnd, fromId, fromTs, primaryFilter, secondaryFilters, fieldsToRetrieve, checkAcl);
            if (entities == null) continue;
            returnEntities.addEntities(entities.getEntities());
        }
        return returnEntities;
    }

    public TimelineEntity getEntity(String entityId, String entityType, EnumSet<TimelineReader.Field> fieldsToRetrieve) throws IOException {
        LOG.debug("getEntity type={} id={}", (Object)entityType, (Object)entityId);
        ArrayList<EntityCacheItem> relatedCacheItems = new ArrayList<EntityCacheItem>();
        List<TimelineStore> stores = this.getTimelineStoresForRead(entityId, entityType, relatedCacheItems);
        for (TimelineStore store : stores) {
            LOG.debug("Try timeline store {}:{} for the request", (Object)store.getName(), (Object)store.toString());
            TimelineEntity e = store.getEntity(entityId, entityType, fieldsToRetrieve);
            if (e == null) continue;
            return e;
        }
        LOG.debug("getEntity: Found nothing");
        return null;
    }

    public TimelineEvents getEntityTimelines(String entityType, SortedSet<String> entityIds, Long limit, Long windowStart, Long windowEnd, Set<String> eventTypes) throws IOException {
        LOG.debug("getEntityTimelines type={} ids={}", (Object)entityType, entityIds);
        TimelineEvents returnEvents = new TimelineEvents();
        ArrayList<EntityCacheItem> relatedCacheItems = new ArrayList<EntityCacheItem>();
        for (String entityId : entityIds) {
            LOG.debug("getEntityTimeline type={} id={}", (Object)entityType, (Object)entityId);
            List<TimelineStore> stores = this.getTimelineStoresForRead(entityId, entityType, relatedCacheItems);
            for (TimelineStore store : stores) {
                LOG.debug("Try timeline store {}:{} for the request", (Object)store.getName(), (Object)store.toString());
                TreeSet<String> entityIdSet = new TreeSet<String>();
                entityIdSet.add(entityId);
                TimelineEvents events = store.getEntityTimelines(entityType, entityIdSet, limit, windowStart, windowEnd, eventTypes);
                if (events == null) continue;
                returnEvents.addEvents(events.getAllEvents());
            }
        }
        return returnEvents;
    }

    public TimelineDomain getDomain(String domainId) throws IOException {
        return this.summaryStore.getDomain(domainId);
    }

    public TimelineDomains getDomains(String owner) throws IOException {
        return this.summaryStore.getDomains(owner);
    }

    public TimelinePutResponse put(TimelineEntities data) throws IOException {
        return this.summaryStore.put(data);
    }

    public void put(TimelineDomain domain) throws IOException {
        this.summaryStore.put(domain);
    }

    private class StoppableRemoteIterator
    implements RemoteIterator<FileStatus> {
        private final RemoteIterator<FileStatus> remote;

        public StoppableRemoteIterator(RemoteIterator<FileStatus> remote) {
            this.remote = remote;
        }

        public boolean hasNext() throws IOException {
            return !EntityGroupFSTimelineStore.this.stopExecutors.get() && this.remote.hasNext();
        }

        public FileStatus next() throws IOException {
            return (FileStatus)this.remote.next();
        }
    }

    private class EntityLogCleaner
    implements Runnable {
        private EntityLogCleaner() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            LOG.debug("Cleaner starting");
            long startTime = Time.monotonicNow();
            try {
                EntityGroupFSTimelineStore.this.cleanLogs(EntityGroupFSTimelineStore.this.doneRootPath, EntityGroupFSTimelineStore.this.fs, EntityGroupFSTimelineStore.this.logRetainMillis);
            }
            catch (Exception e) {
                Throwable t = EntityGroupFSTimelineStore.this.extract(e);
                if (t instanceof InterruptedException) {
                    LOG.info("Cleaner interrupted");
                } else {
                    LOG.error("Error cleaning files", (Throwable)e);
                }
            }
            finally {
                EntityGroupFSTimelineStore.this.metrics.addLogCleanTime(Time.monotonicNow() - startTime);
            }
            LOG.debug("Cleaner finished");
        }
    }

    private class ActiveLogParser
    implements Runnable {
        private AppLogs appLogs;

        public ActiveLogParser(AppLogs logs) {
            this.appLogs = logs;
        }

        @Override
        public void run() {
            try {
                LOG.debug("Begin parsing summary logs. ");
                this.appLogs.parseSummaryLogs();
                if (this.appLogs.isDone()) {
                    this.appLogs.moveToDone();
                    EntityGroupFSTimelineStore.this.appIdLogMap.remove(this.appLogs.getAppId());
                }
                LOG.debug("End parsing summary logs. ");
            }
            catch (Exception e) {
                Throwable t = EntityGroupFSTimelineStore.this.extract(e);
                if (t instanceof InterruptedException) {
                    LOG.info("Log parser interrupted");
                }
                LOG.error("Error processing logs for " + this.appLogs.getAppId(), t);
            }
        }
    }

    private class EntityLogScanner
    implements Runnable {
        private EntityLogScanner() {
        }

        @Override
        public void run() {
            LOG.debug("Active scan starting");
            try {
                int scanned = EntityGroupFSTimelineStore.this.scanActiveLogs();
                LOG.debug("Scanned {} active applications", (Object)scanned);
            }
            catch (Exception e) {
                Throwable t = EntityGroupFSTimelineStore.this.extract(e);
                if (t instanceof InterruptedException) {
                    LOG.info("File scanner interrupted");
                }
                LOG.error("Error scanning active files", t);
            }
            LOG.debug("Active scan complete");
        }
    }

    class AppLogs {
        private ApplicationId appId;
        private Path appDirPath;
        private AppState appState;
        private List<LogInfo> summaryLogs = new ArrayList<LogInfo>();
        private List<LogInfo> detailLogs = new ArrayList<LogInfo>();

        public AppLogs(ApplicationId appId, Path appPath, AppState state) {
            this.appId = appId;
            this.appDirPath = appPath;
            this.appState = state;
        }

        public synchronized boolean isDone() {
            return this.appState == AppState.COMPLETED;
        }

        public synchronized ApplicationId getAppId() {
            return this.appId;
        }

        public synchronized Path getAppDirPath() {
            return this.appDirPath;
        }

        synchronized List<LogInfo> getSummaryLogs() {
            return this.summaryLogs;
        }

        synchronized List<LogInfo> getDetailLogs() {
            return this.detailLogs;
        }

        public synchronized void parseSummaryLogs() throws IOException {
            this.parseSummaryLogs(EntityGroupFSTimelineStore.this.summaryTdm);
        }

        @InterfaceAudience.Private
        @VisibleForTesting
        synchronized void parseSummaryLogs(TimelineDataManager tdm) throws IOException {
            long startTime = Time.monotonicNow();
            if (!this.isDone()) {
                LOG.debug("Try to parse summary log for log {} in {}", (Object)this.appId, (Object)this.appDirPath);
                this.appState = EntityGroupFSTimelineStore.this.getAppState(this.appId);
                long recentLogModTime = this.scanForLogs();
                if (this.appState == AppState.UNKNOWN && Time.now() - recentLogModTime > EntityGroupFSTimelineStore.this.unknownActiveMillis) {
                    LOG.info("{} state is UNKNOWN and logs are stale, assuming COMPLETED", (Object)this.appId);
                    this.appState = AppState.COMPLETED;
                }
            }
            ArrayList<LogInfo> removeList = new ArrayList<LogInfo>();
            for (LogInfo log : this.summaryLogs) {
                if (EntityGroupFSTimelineStore.this.fs.exists(log.getPath(this.appDirPath))) {
                    long summaryEntityParsed = log.parseForStore(tdm, this.appDirPath, this.isDone(), EntityGroupFSTimelineStore.this.jsonFactory, EntityGroupFSTimelineStore.this.objMapper, EntityGroupFSTimelineStore.this.fs);
                    EntityGroupFSTimelineStore.this.metrics.incrEntitiesReadToSummary(summaryEntityParsed);
                    continue;
                }
                removeList.add(log);
                LOG.info("File {} no longer exists, remove it from log list", (Object)log.getPath(this.appDirPath));
            }
            this.summaryLogs.removeAll(removeList);
            EntityGroupFSTimelineStore.this.metrics.addSummaryLogReadTime(Time.monotonicNow() - startTime);
        }

        @InterfaceAudience.Private
        @VisibleForTesting
        long scanForLogs() throws IOException {
            LOG.debug("scanForLogs on {}", (Object)this.appDirPath);
            long newestModTime = 0L;
            RemoteIterator iterAttempt = EntityGroupFSTimelineStore.this.list(this.appDirPath);
            while (iterAttempt.hasNext()) {
                FileStatus statAttempt = (FileStatus)iterAttempt.next();
                LOG.debug("scanForLogs on {}", (Object)statAttempt.getPath().getName());
                if (!statAttempt.isDirectory() || !statAttempt.getPath().getName().startsWith("appattempt")) {
                    LOG.debug("Scanner skips for unknown dir/file {}", (Object)statAttempt.getPath());
                    continue;
                }
                String attemptDirName = statAttempt.getPath().getName();
                RemoteIterator iterCache = EntityGroupFSTimelineStore.this.list(statAttempt.getPath());
                while (iterCache.hasNext()) {
                    FileStatus statCache = (FileStatus)iterCache.next();
                    if (!statCache.isFile()) continue;
                    String filename = statCache.getPath().getName();
                    boolean shouldSetTime = true;
                    LOG.debug("scan for log file: {}", (Object)filename);
                    if (filename.startsWith(EntityGroupFSTimelineStore.DOMAIN_LOG_PREFIX)) {
                        this.addSummaryLog(attemptDirName, filename, statCache.getOwner(), true);
                    } else if (filename.startsWith(EntityGroupFSTimelineStore.SUMMARY_LOG_PREFIX)) {
                        this.addSummaryLog(attemptDirName, filename, statCache.getOwner(), false);
                    } else if (filename.startsWith(EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX)) {
                        this.addDetailLog(attemptDirName, filename, statCache.getOwner());
                    } else {
                        shouldSetTime = false;
                    }
                    if (!shouldSetTime) continue;
                    newestModTime = Math.max(statCache.getModificationTime(), newestModTime);
                }
            }
            if (newestModTime == 0L) {
                newestModTime = EntityGroupFSTimelineStore.this.fs.getFileStatus(this.appDirPath).getModificationTime();
            }
            return newestModTime;
        }

        private void addSummaryLog(String attemptDirName, String filename, String owner, boolean isDomainLog) {
            LogInfo log;
            for (LogInfo log2 : this.summaryLogs) {
                if (!log2.getFilename().equals(filename) || !log2.getAttemptDirName().equals(attemptDirName)) continue;
                return;
            }
            LOG.debug("Incoming log {} not present in my summaryLogs list, add it", (Object)filename);
            if (isDomainLog) {
                log = new DomainLogInfo(attemptDirName, filename, owner);
                this.summaryLogs.add(0, log);
            } else {
                log = new EntityLogInfo(attemptDirName, filename, owner);
                this.summaryLogs.add(log);
            }
        }

        private synchronized void addDetailLog(String attemptDirName, String filename, String owner) {
            for (LogInfo log : this.detailLogs) {
                if (!log.getFilename().equals(filename) || !log.getAttemptDirName().equals(attemptDirName)) continue;
                return;
            }
            this.detailLogs.add(new EntityLogInfo(attemptDirName, filename, owner));
        }

        synchronized void loadDetailLog(TimelineDataManager tdm, TimelineEntityGroupId groupId) throws IOException {
            ArrayList<LogInfo> removeList = new ArrayList<LogInfo>();
            for (LogInfo log : this.detailLogs) {
                LOG.debug("Try refresh logs for {}", (Object)log.getFilename());
                if (!log.matchesGroupId(groupId)) continue;
                Path dirPath = this.getAppDirPath();
                if (EntityGroupFSTimelineStore.this.fs.exists(log.getPath(dirPath))) {
                    LOG.debug("Refresh logs for cache id {}", (Object)groupId);
                    log.parseForStore(tdm, dirPath, this.isDone(), EntityGroupFSTimelineStore.this.jsonFactory, EntityGroupFSTimelineStore.this.objMapper, EntityGroupFSTimelineStore.this.fs);
                    continue;
                }
                removeList.add(log);
                LOG.info("File {} no longer exists, removing it from log list", (Object)log.getPath(dirPath));
            }
            this.detailLogs.removeAll(removeList);
        }

        public synchronized void moveToDone() throws IOException {
            Path doneAppPath = EntityGroupFSTimelineStore.this.getDoneAppPath(this.appId);
            if (!doneAppPath.equals((Object)this.appDirPath)) {
                Path donePathParent = doneAppPath.getParent();
                if (!EntityGroupFSTimelineStore.this.fs.exists(donePathParent)) {
                    EntityGroupFSTimelineStore.this.fs.mkdirs(donePathParent);
                }
                LOG.debug("Application {} is done, trying to move to done dir {}", (Object)this.appId, (Object)doneAppPath);
                if (!EntityGroupFSTimelineStore.this.fs.rename(this.appDirPath, doneAppPath)) {
                    throw new IOException("Rename " + this.appDirPath + " to " + doneAppPath + " failed");
                }
                LOG.info("Moved {} to {}", (Object)this.appDirPath, (Object)doneAppPath);
                this.appDirPath = doneAppPath;
            }
        }
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public static enum AppState {
        ACTIVE,
        UNKNOWN,
        COMPLETED;

    }
}

