/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.storage.hdfs;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.Timer;
import io.pravega.common.util.RetriesExhaustedException;
import io.pravega.common.util.Retry;
import io.pravega.segmentstore.contracts.BadOffsetException;
import io.pravega.segmentstore.contracts.SegmentProperties;
import io.pravega.segmentstore.contracts.StreamSegmentException;
import io.pravega.segmentstore.contracts.StreamSegmentExistsException;
import io.pravega.segmentstore.contracts.StreamSegmentInformation;
import io.pravega.segmentstore.contracts.StreamSegmentSealedException;
import io.pravega.segmentstore.storage.SegmentHandle;
import io.pravega.segmentstore.storage.StorageNotPrimaryException;
import io.pravega.segmentstore.storage.SyncStorage;
import io.pravega.storage.hdfs.FileNameFormatException;
import io.pravega.storage.hdfs.HDFSExceptionHelpers;
import io.pravega.storage.hdfs.HDFSMetrics;
import io.pravega.storage.hdfs.HDFSSegmentHandle;
import io.pravega.storage.hdfs.HDFSStorageConfig;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class HDFSStorage
implements SyncStorage {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(HDFSStorage.class);
    private static final String PART_SEPARATOR = "_";
    private static final String NAME_FORMAT = "%s_%s";
    private static final String SEALED = "sealed";
    private static final String SUFFIX_GLOB_REGEX = "{[0-9]*,sealed}";
    private static final String EXAMPLE_NAME_FORMAT = String.format("%s_%s", "<segment-name>", "<epoch>");
    private static final FsPermission READWRITE_PERMISSION = new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE);
    private static final FsPermission READONLY_PERMISSION = new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ);
    private static final int MAX_ATTEMPT_COUNT = 3;
    private static final long MAX_EPOCH = Long.MAX_VALUE;
    private static final Retry.RetryAndThrowExceptionally<FileNotFoundException, IOException> HDFS_RETRY = Retry.withExpBackoff((long)1L, (int)5, (int)3).retryingOn(FileNotFoundException.class).throwingOn(IOException.class);
    private final HDFSStorageConfig config;
    private final AtomicBoolean closed;
    private long epoch;
    private FileSystem fileSystem;

    HDFSStorage(HDFSStorageConfig config) {
        Preconditions.checkNotNull((Object)config, (Object)"config");
        this.config = config;
        this.closed = new AtomicBoolean(false);
    }

    public void close() {
        if (!this.closed.getAndSet(true) && this.fileSystem != null) {
            try {
                this.fileSystem.close();
                this.fileSystem = null;
            }
            catch (IOException e) {
                log.warn("Could not close the HDFS filesystem: {}.", (Throwable)e);
            }
        }
    }

    public void initialize(long epoch) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Preconditions.checkState((this.fileSystem == null ? 1 : 0) != 0, (Object)"HDFSStorage has already been initialized.");
        Preconditions.checkArgument((epoch > 0L ? 1 : 0) != 0, (String)"epoch must be a positive number. Given %s.", (long)epoch);
        Configuration conf = new Configuration();
        conf.set("fs.default.name", this.config.getHdfsHostURL());
        conf.set("fs.default.fs", this.config.getHdfsHostURL());
        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        conf.set("fs.hdfs.impl.disable.cache", "true");
        if (!this.config.isReplaceDataNodesOnFailure()) {
            conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
        }
        this.epoch = epoch;
        this.fileSystem = this.openFileSystem(conf);
        log.info("Initialized (HDFSHost = '{}', Epoch = {}).", (Object)this.config.getHdfsHostURL(), (Object)epoch);
    }

    public SegmentProperties getStreamSegmentInfo(String streamSegmentName) throws StreamSegmentException {
        this.ensureInitializedAndNotClosed();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"getStreamSegmentInfo", (Object[])new Object[]{streamSegmentName});
        try {
            return (SegmentProperties)HDFS_RETRY.run(() -> {
                FileStatus last = this.findStatusForSegment(streamSegmentName, true);
                boolean isSealed = this.isSealed(last.getPath());
                StreamSegmentInformation result = StreamSegmentInformation.builder().name(streamSegmentName).length(last.getLen()).sealed(isSealed).build();
                LoggerHelpers.traceLeave((Logger)log, (String)"getStreamSegmentInfo", (long)traceId, (Object[])new Object[]{streamSegmentName, result});
                return result;
            });
        }
        catch (IOException e) {
            throw HDFSExceptionHelpers.convertException(streamSegmentName, e);
        }
        catch (RetriesExhaustedException e) {
            throw HDFSExceptionHelpers.convertException(streamSegmentName, e.getCause());
        }
    }

    public boolean exists(String streamSegmentName) {
        this.ensureInitializedAndNotClosed();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"exists", (Object[])new Object[]{streamSegmentName});
        FileStatus status = null;
        try {
            status = this.findStatusForSegment(streamSegmentName, false);
        }
        catch (IOException e) {
            log.warn("Got exception checking if file exists", (Throwable)e);
        }
        boolean exists = status != null;
        LoggerHelpers.traceLeave((Logger)log, (String)"exists", (long)traceId, (Object[])new Object[]{streamSegmentName, exists});
        return exists;
    }

    private boolean isSealed(Path path) throws FileNameFormatException {
        return this.getEpochFromPath(path) == Long.MAX_VALUE;
    }

    FileSystem openFileSystem(Configuration conf) throws IOException {
        return FileSystem.get((Configuration)conf);
    }

    public int read(SegmentHandle handle, long offset, byte[] buffer, int bufferOffset, int length) throws StreamSegmentException {
        this.ensureInitializedAndNotClosed();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"read", (Object[])new Object[]{handle, offset, length});
        if (offset < 0L || bufferOffset < 0 || length < 0 || buffer.length < bufferOffset + length) {
            throw new ArrayIndexOutOfBoundsException(String.format("Offset (%s) must be non-negative, and bufferOffset (%s) and length (%s) must be valid indices into buffer of size %s.", offset, bufferOffset, length, buffer.length));
        }
        Timer timer = new Timer();
        try {
            return (Integer)HDFS_RETRY.run(() -> {
                int totalBytesRead = this.readInternal(handle, buffer, offset, bufferOffset, length);
                HDFSMetrics.READ_LATENCY.reportSuccessEvent(timer.getElapsed());
                HDFSMetrics.READ_BYTES.add((long)totalBytesRead);
                LoggerHelpers.traceLeave((Logger)log, (String)"read", (long)traceId, (Object[])new Object[]{handle, offset, totalBytesRead});
                return totalBytesRead;
            });
        }
        catch (IOException e) {
            throw HDFSExceptionHelpers.convertException(handle.getSegmentName(), e);
        }
        catch (RetriesExhaustedException e) {
            throw HDFSExceptionHelpers.convertException(handle.getSegmentName(), e.getCause());
        }
    }

    public SegmentHandle openRead(String streamSegmentName) throws StreamSegmentException {
        this.ensureInitializedAndNotClosed();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"openRead", (Object[])new Object[]{streamSegmentName});
        try {
            this.findStatusForSegment(streamSegmentName, true);
            LoggerHelpers.traceLeave((Logger)log, (String)"openRead", (long)traceId, (Object[])new Object[]{streamSegmentName});
            return HDFSSegmentHandle.read(streamSegmentName);
        }
        catch (IOException e) {
            throw HDFSExceptionHelpers.convertException(streamSegmentName, e);
        }
    }

    public void seal(SegmentHandle handle) throws StreamSegmentException {
        this.ensureInitializedAndNotClosed();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"seal", (Object[])new Object[]{handle});
        handle = this.asWritableHandle(handle);
        try {
            FileStatus status = this.findStatusForSegment(handle.getSegmentName(), true);
            if (!this.isSealed(status.getPath())) {
                if (this.getEpoch(status) > this.epoch) {
                    throw new StorageNotPrimaryException(handle.getSegmentName());
                }
                this.makeReadOnly(status);
                Path sealedPath = this.getSealedFilePath(handle.getSegmentName());
                this.fileSystem.rename(status.getPath(), sealedPath);
            }
        }
        catch (IOException e) {
            throw HDFSExceptionHelpers.convertException(handle.getSegmentName(), e);
        }
        LoggerHelpers.traceLeave((Logger)log, (String)"seal", (long)traceId, (Object[])new Object[]{handle});
    }

    public void unseal(SegmentHandle handle) throws StreamSegmentException {
        this.ensureInitializedAndNotClosed();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"seal", (Object[])new Object[]{handle});
        try {
            FileStatus status = this.findStatusForSegment(handle.getSegmentName(), true);
            this.makeWrite(status);
            this.fileSystem.rename(status.getPath(), this.getFilePath(handle.getSegmentName(), this.epoch));
        }
        catch (IOException e) {
            throw HDFSExceptionHelpers.convertException(handle.getSegmentName(), e);
        }
        LoggerHelpers.traceLeave((Logger)log, (String)"unseal", (long)traceId, (Object[])new Object[]{handle});
    }

    public void concat(SegmentHandle target, long offset, String sourceSegment) throws StreamSegmentException {
        this.ensureInitializedAndNotClosed();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"concat", (Object[])new Object[]{target, offset, sourceSegment});
        target = this.asWritableHandle(target);
        FileStatus fileStatus = null;
        try {
            fileStatus = this.findStatusForSegment(target.getSegmentName(), true);
            if (this.isSealed(fileStatus.getPath())) {
                throw new StreamSegmentSealedException(target.getSegmentName());
            }
            if (this.getEpoch(fileStatus) > this.epoch) {
                throw new StorageNotPrimaryException(target.getSegmentName());
            }
            if (fileStatus.getLen() != offset) {
                throw new BadOffsetException(target.getSegmentName(), fileStatus.getLen(), offset);
            }
        }
        catch (IOException ex) {
            throw HDFSExceptionHelpers.convertException(target.getSegmentName(), ex);
        }
        try {
            FileStatus sourceFile = this.findStatusForSegment(sourceSegment, true);
            Preconditions.checkState((boolean)this.isSealed(sourceFile.getPath()), (String)"Cannot concat segment '%s' into '%s' because it is not sealed.", (Object)sourceSegment, (Object)target.getSegmentName());
            this.fileSystem.concat(fileStatus.getPath(), new Path[]{sourceFile.getPath()});
        }
        catch (IOException ex) {
            throw HDFSExceptionHelpers.convertException(sourceSegment, ex);
        }
        LoggerHelpers.traceLeave((Logger)log, (String)"concat", (long)traceId, (Object[])new Object[]{target, offset, sourceSegment});
    }

    public void delete(SegmentHandle handle) throws StreamSegmentException {
        this.ensureInitializedAndNotClosed();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"delete", (Object[])new Object[]{handle});
        handle = this.asWritableHandle(handle);
        try {
            FileStatus statusForSegment = this.findStatusForSegment(handle.getSegmentName(), true);
            if (this.getEpoch(statusForSegment) > this.epoch && !this.isSealed(statusForSegment.getPath())) {
                throw new StorageNotPrimaryException(handle.getSegmentName());
            }
            this.fileSystem.delete(statusForSegment.getPath(), true);
        }
        catch (IOException e) {
            throw HDFSExceptionHelpers.convertException(handle.getSegmentName(), e);
        }
        LoggerHelpers.traceLeave((Logger)log, (String)"delete", (long)traceId, (Object[])new Object[]{handle});
    }

    public void truncate(SegmentHandle handle, long offset) {
        throw new UnsupportedOperationException(this.getClass().getName() + " does not support Segment truncation.");
    }

    public boolean supportsTruncation() {
        this.ensureInitializedAndNotClosed();
        return false;
    }

    public void write(SegmentHandle handle, long offset, InputStream data, int length) throws StreamSegmentException {
        this.ensureInitializedAndNotClosed();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"write", (Object[])new Object[]{handle, offset, length});
        handle = this.asWritableHandle(handle);
        FileStatus status = null;
        try {
            status = this.findStatusForSegment(handle.getSegmentName(), true);
            if (this.isSealed(status.getPath())) {
                throw new StreamSegmentSealedException(handle.getSegmentName());
            }
            if (this.getEpochFromPath(status.getPath()) > this.epoch) {
                throw new StorageNotPrimaryException(handle.getSegmentName());
            }
        }
        catch (IOException e) {
            throw HDFSExceptionHelpers.convertException(handle.getSegmentName(), e);
        }
        Timer timer = new Timer();
        try (FSDataOutputStream stream = this.fileSystem.append(status.getPath());){
            if (offset != status.getLen()) {
                throw new BadOffsetException(handle.getSegmentName(), status.getLen(), offset);
            }
            if (stream.getPos() != offset) {
                log.warn("File changed detected for '{}'. Expected length = {}, actual length = {}.", new Object[]{status, status.getLen(), stream.getPos()});
                throw new BadOffsetException(handle.getSegmentName(), status.getLen(), offset);
            }
            if (length == 0) {
                return;
            }
            IOUtils.copyBytes((InputStream)data, (OutputStream)stream, (long)length, (boolean)false);
            stream.flush();
        }
        catch (IOException ex) {
            throw HDFSExceptionHelpers.convertException(handle.getSegmentName(), ex);
        }
        HDFSMetrics.WRITE_LATENCY.reportSuccessEvent(timer.getElapsed());
        HDFSMetrics.WRITE_BYTES.add((long)length);
        LoggerHelpers.traceLeave((Logger)log, (String)"write", (long)traceId, (Object[])new Object[]{handle, offset, length});
    }

    public SegmentHandle openWrite(String streamSegmentName) throws StreamSegmentException {
        this.ensureInitializedAndNotClosed();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"openWrite", (Object[])new Object[]{streamSegmentName});
        long fencedCount = 0L;
        do {
            try {
                FileStatus fileStatus = this.findStatusForSegment(streamSegmentName, true);
                if (!this.isSealed(fileStatus.getPath())) {
                    if (this.getEpochFromPath(fileStatus.getPath()) > this.epoch) {
                        throw new StorageNotPrimaryException(streamSegmentName);
                    }
                    Path targetPath = this.getFilePath(streamSegmentName, this.epoch);
                    if (!targetPath.equals((Object)fileStatus.getPath())) {
                        try {
                            this.fileSystem.rename(fileStatus.getPath(), targetPath);
                        }
                        catch (FileNotFoundException e) {
                            log.warn("Race in fencing. More than two hosts trying to own the segment. Retrying");
                            continue;
                        }
                    }
                }
                this.findStatusForSegment(streamSegmentName, true);
                return HDFSSegmentHandle.write(streamSegmentName);
            }
            catch (IOException e) {
                throw HDFSExceptionHelpers.convertException(streamSegmentName, e);
            }
        } while (++fencedCount <= this.epoch);
        LoggerHelpers.traceLeave((Logger)log, (String)"openWrite", (long)traceId, (Object[])new Object[]{this.epoch});
        throw new StorageNotPrimaryException("Not able to fence out other writers.");
    }

    public SegmentHandle create(String streamSegmentName) throws StreamSegmentException {
        this.ensureInitializedAndNotClosed();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"create", (Object[])new Object[]{streamSegmentName});
        FileStatus[] status = null;
        try {
            status = this.findAllRaw(streamSegmentName);
        }
        catch (IOException e) {
            throw HDFSExceptionHelpers.convertException(streamSegmentName, e);
        }
        if (status != null && status.length > 0) {
            throw HDFSExceptionHelpers.convertException(streamSegmentName, (Throwable)HDFSExceptionHelpers.segmentExistsException(streamSegmentName));
        }
        Path fullPath = this.getFilePath(streamSegmentName, 0L);
        try {
            this.fileSystem.create(fullPath, READWRITE_PERMISSION, false, 0, this.config.getReplication(), this.config.getBlockSize(), null).close();
            HDFSMetrics.CREATE_COUNT.inc();
            log.debug("Created '{}'.", (Object)fullPath);
        }
        catch (IOException e) {
            throw HDFSExceptionHelpers.convertException(streamSegmentName, e);
        }
        try {
            status = this.findAllRaw(streamSegmentName);
            if (status != null && status.length > 1) {
                this.fileSystem.delete(fullPath, true);
                throw new StreamSegmentExistsException(streamSegmentName);
            }
        }
        catch (IOException e) {
            log.warn("Exception while deleting a file with epoch 0.", (Throwable)e);
        }
        LoggerHelpers.traceLeave((Logger)log, (String)"create", (long)traceId, (Object[])new Object[]{streamSegmentName});
        return HDFSSegmentHandle.write(streamSegmentName);
    }

    private HDFSSegmentHandle asWritableHandle(SegmentHandle handle) {
        Preconditions.checkArgument((!handle.isReadOnly() ? 1 : 0) != 0, (Object)"handle must not be read-only.");
        return this.asReadableHandle(handle);
    }

    private HDFSSegmentHandle asReadableHandle(SegmentHandle handle) {
        Preconditions.checkArgument((boolean)(handle instanceof HDFSSegmentHandle), (Object)"handle must be of type HDFSSegmentHandle.");
        return (HDFSSegmentHandle)handle;
    }

    private void ensureInitializedAndNotClosed() {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Preconditions.checkState((this.fileSystem != null ? 1 : 0) != 0, (Object)"HDFSStorage is not initialized.");
    }

    private FileStatus[] findAllRaw(String segmentName) throws IOException {
        assert (segmentName != null && segmentName.length() > 0) : "segmentName must be non-null and non-empty";
        String pattern = String.format(NAME_FORMAT, this.getPathPrefix(segmentName), SUFFIX_GLOB_REGEX);
        FileStatus[] files = this.fileSystem.globStatus(new Path(pattern));
        if (files.length > 1) {
            throw new IllegalArgumentException("More than one file");
        }
        return files;
    }

    private String getPathPrefix(String segmentName) {
        return this.config.getHdfsRoot() + "/" + segmentName;
    }

    private Path getFilePath(String segmentName, long epoch) {
        Preconditions.checkState((segmentName != null && segmentName.length() > 0 ? 1 : 0) != 0, (Object)"segmentName must be non-null and non-empty");
        Preconditions.checkState((epoch >= 0L ? 1 : 0) != 0, (Object)("epoch must be non-negative " + epoch));
        return new Path(String.format(NAME_FORMAT, this.getPathPrefix(segmentName), epoch));
    }

    private Path getSealedFilePath(String segmentName) {
        Preconditions.checkState((segmentName != null && segmentName.length() > 0 ? 1 : 0) != 0, (Object)"segmentName must be non-null and non-empty");
        return new Path(String.format(NAME_FORMAT, this.getPathPrefix(segmentName), SEALED));
    }

    private FileStatus findStatusForSegment(String segmentName, boolean enforceExistence) throws IOException {
        FileStatus[] rawFiles = this.findAllRaw(segmentName);
        if (rawFiles == null || rawFiles.length == 0) {
            if (enforceExistence) {
                throw HDFSExceptionHelpers.segmentNotExistsException(segmentName);
            }
            return null;
        }
        List result = Arrays.stream(rawFiles).sorted(this::compareFileStatus).collect(Collectors.toList());
        return (FileStatus)result.get(result.size() - 1);
    }

    private int compareFileStatus(FileStatus f1, FileStatus f2) {
        try {
            return Long.compare(this.getEpoch(f1), this.getEpoch(f2));
        }
        catch (FileNameFormatException e) {
            throw new IllegalStateException(e);
        }
    }

    private long getEpoch(FileStatus status) throws FileNameFormatException {
        return this.getEpochFromPath(status.getPath());
    }

    private long getEpochFromPath(Path path) throws FileNameFormatException {
        String fileName = path.toString();
        int pos2 = fileName.lastIndexOf(PART_SEPARATOR);
        if (pos2 <= 0) {
            throw new FileNameFormatException(fileName, "File must be in the following format: " + EXAMPLE_NAME_FORMAT);
        }
        if (pos2 == fileName.length() - 1 || fileName.regionMatches(pos2 + 1, SEALED, 0, SEALED.length())) {
            return Long.MAX_VALUE;
        }
        try {
            return Long.parseLong(fileName.substring(pos2 + 1));
        }
        catch (NumberFormatException nfe) {
            throw new FileNameFormatException(fileName, "Could not extract offset or epoch.", nfe);
        }
    }

    private boolean isReadOnly(FileStatus fs) {
        return fs.getPermission().getUserAction() == FsAction.READ;
    }

    private boolean makeReadOnly(FileStatus file) throws IOException {
        if (this.isReadOnly(file)) {
            return false;
        }
        this.fileSystem.setPermission(file.getPath(), READONLY_PERMISSION);
        log.debug("MakeReadOnly '{}'.", (Object)file.getPath());
        return true;
    }

    private boolean makeWrite(FileStatus file) throws IOException {
        this.fileSystem.setPermission(file.getPath(), READWRITE_PERMISSION);
        log.debug("MakeReadOnly '{}'.", (Object)file.getPath());
        return true;
    }

    private int readInternal(SegmentHandle handle, byte[] buffer, long offset, int bufferOffset, int length) throws IOException {
        FileStatus currentFile = this.findStatusForSegment(handle.getSegmentName(), true);
        try (FSDataInputStream stream = this.fileSystem.open(currentFile.getPath());){
            stream.readFully(offset, buffer, bufferOffset, length);
        }
        catch (EOFException e) {
            throw new IllegalArgumentException(String.format("Reading at offset (%d) which is beyond the current size of segment.", offset));
        }
        return length;
    }
}

