/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.util.filesystem;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.gobblin.broker.iface.ConfigView;
import org.apache.gobblin.broker.iface.NotConfiguredException;
import org.apache.gobblin.broker.iface.ScopeType;
import org.apache.gobblin.broker.iface.SharedResourceKey;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.filesystem.FileSystemInstrumentation;
import org.apache.gobblin.util.filesystem.FileSystemInstrumentationFactory;
import org.apache.gobblin.util.filesystem.FileSystemKey;
import org.apache.gobblin.util.filesystem.FileSystemLimiterKey;
import org.apache.gobblin.util.limiter.Limiter;
import org.apache.gobblin.util.limiter.NotEnoughPermitsException;
import org.apache.gobblin.util.limiter.broker.SharedLimiterFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;

public class ThrottledFileSystem
extends FileSystemInstrumentation {
    public static final int LISTING_FILES_PER_PERMIT = 100;
    private final Limiter limiter;
    private final String serviceName;

    public ThrottledFileSystem(FileSystem fs, Limiter limiter, String serviceName) {
        super(fs);
        this.limiter = limiter;
        this.serviceName = serviceName;
    }

    @Override
    public boolean delete(Path path) throws IOException {
        return this.delete(path, true);
    }

    @Override
    public boolean delete(Path path, boolean recursive) throws IOException {
        this.acquirePermit("delete " + path);
        return super.delete(path, recursive);
    }

    @Override
    public boolean exists(Path path) throws IOException {
        this.acquirePermit("exists " + path);
        return super.exists(path);
    }

    @Override
    public FileStatus getFileStatus(Path path) throws IOException {
        this.acquirePermit("getFileStatus " + path);
        return super.getFileStatus(path);
    }

    @Override
    public FileStatus[] globStatus(Path pathPattern) throws IOException {
        FileStatus[] statuses = super.globStatus(pathPattern);
        if (statuses == null) {
            this.acquirePermit("globStatus " + pathPattern);
        } else {
            this.acquirePermits(statuses.length / 100 + 1, "globStatus " + pathPattern);
        }
        return statuses;
    }

    @Override
    public FileStatus[] listStatus(Path path) throws IOException {
        FileStatus[] statuses = super.listStatus(path);
        if (statuses == null) {
            this.acquirePermit("listStatus " + path);
        } else {
            this.acquirePermits(statuses.length / 100 + 1, "listStatus " + path);
        }
        return statuses;
    }

    @Override
    public FileStatus[] listStatus(Path path, PathFilter filter) throws IOException {
        CountingPathFilterDecorator decoratedFilter = new CountingPathFilterDecorator(filter);
        FileStatus[] statuses = super.listStatus(path, (PathFilter)decoratedFilter);
        if (statuses == null) {
            this.acquirePermit("listStatus " + path);
        } else {
            this.acquirePermits(decoratedFilter.getPathsProcessed().get() / 100 + 1, "listStatus " + path);
        }
        return statuses;
    }

    @Override
    public boolean mkdirs(Path path, FsPermission permission) throws IOException {
        this.acquirePermit("mkdirs " + path);
        return super.mkdirs(path, permission);
    }

    @Override
    public boolean rename(Path path0, Path path1) throws IOException {
        this.acquirePermit("rename " + path0);
        return HadoopUtils.renamePathHandleLocalFSRace(this.underlyingFs, path0, path1);
    }

    @Override
    public FSDataOutputStream append(Path path, int bufferSize, Progressable progress) throws IOException {
        this.acquirePermit("append " + path);
        return super.append(path, bufferSize, progress);
    }

    @Override
    public FSDataOutputStream create(Path path, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
        this.acquirePermit("create " + path);
        return super.create(path, permission, overwrite, bufferSize, replication, blockSize, progress);
    }

    @Override
    public FSDataInputStream open(Path path, int bufferSize) throws IOException {
        this.acquirePermit("open " + path);
        return super.open(path, bufferSize);
    }

    private void acquirePermit(String op) throws IOException {
        this.acquirePermits(1, op);
    }

    private void acquirePermits(int permits, String op) throws IOException {
        try {
            Closeable closeable = this.getRateLimiter().acquirePermits(permits);
            if (closeable == null) {
                throw new NotEnoughPermitsException(op);
            }
        }
        catch (InterruptedException e) {
            throw new NotEnoughPermitsException(op, e);
        }
    }

    protected Limiter getRateLimiter() {
        return this.limiter;
    }

    public String getServiceName() {
        return this.serviceName;
    }

    @Override
    public void close() throws IOException {
        this.getRateLimiter().stop();
        super.close();
    }

    private static class CountingPathFilterDecorator
    implements PathFilter {
        private final PathFilter underlying;
        private final AtomicInteger pathsProcessed = new AtomicInteger();

        public boolean accept(Path path) {
            this.pathsProcessed.incrementAndGet();
            return this.underlying.accept(path);
        }

        public CountingPathFilterDecorator(PathFilter underlying) {
            this.underlying = underlying;
        }

        public AtomicInteger getPathsProcessed() {
            return this.pathsProcessed;
        }
    }

    public static class Factory<S extends ScopeType<S>>
    extends FileSystemInstrumentationFactory<S> {
        private static final String SERVICE_NAME_CONF_KEY = "gobblin.broker.filesystem.limiterServiceName";

        @Override
        public FileSystem instrumentFileSystem(FileSystem fs, SharedResourcesBroker<S> broker, ConfigView<S, FileSystemKey> config) {
            try {
                String serviceName = ConfigUtils.getString(config.getConfig(), SERVICE_NAME_CONF_KEY, "");
                Limiter limiter = (Limiter)broker.getSharedResource(new SharedLimiterFactory(), (SharedResourceKey)new FileSystemLimiterKey(((FileSystemKey)config.getKey()).getUri()));
                return new ThrottledFileSystem(fs, limiter, serviceName);
            }
            catch (NotConfiguredException nce) {
                throw new RuntimeException(nce);
            }
        }
    }
}

