/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.offload.filesystem.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.netty.util.Recycler;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.offload.OffloadUtils;
import org.apache.bookkeeper.mledger.offload.filesystem.FileSystemLedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.offload.filesystem.impl.FileStoreBackedReadHandleImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileSystemManagedLedgerOffloader
implements LedgerOffloader {
    private static final Logger log = LoggerFactory.getLogger(FileSystemManagedLedgerOffloader.class);
    private static final String STORAGE_BASE_PATH = "storageBasePath";
    private static final String DRIVER_NAMES = "filesystem";
    private static final String MANAGED_LEDGER_NAME = "ManagedLedgerName";
    static final long METADATA_KEY_INDEX = -1L;
    private final Configuration configuration;
    private final String driverName;
    private final String storageBasePath;
    private final FileSystem fileSystem;
    private OrderedScheduler scheduler;
    private static final long ENTRIES_PER_READ = 100L;
    private OrderedScheduler assignmentScheduler;
    private OffloadPoliciesImpl offloadPolicies;
    private final LedgerOffloaderStats offloaderStats;

    public static boolean driverSupported(String driver) {
        return DRIVER_NAMES.equals(driver);
    }

    public String getOffloadDriverName() {
        return this.driverName;
    }

    public static FileSystemManagedLedgerOffloader create(OffloadPoliciesImpl conf, OrderedScheduler scheduler, LedgerOffloaderStats offloaderStats) throws IOException {
        return new FileSystemManagedLedgerOffloader(conf, scheduler, offloaderStats);
    }

    private FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, OrderedScheduler scheduler, LedgerOffloaderStats offloaderStats) throws IOException {
        this.offloadPolicies = conf;
        this.configuration = new Configuration();
        if (conf.getFileSystemProfilePath() != null) {
            String[] paths = conf.getFileSystemProfilePath().split(",");
            for (int i = 0; i < paths.length; ++i) {
                this.configuration.addResource(new Path(paths[i]));
            }
        }
        if (!"".equals(conf.getFileSystemURI()) && conf.getFileSystemURI() != null) {
            this.configuration.set("fs.defaultFS", conf.getFileSystemURI());
        }
        if (this.configuration.get("fs.hdfs.impl") == null) {
            this.configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        }
        if (this.configuration.get("fs.file.impl") == null) {
            this.configuration.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
        }
        this.configuration.setClassLoader(FileSystemLedgerOffloaderFactory.class.getClassLoader());
        this.driverName = conf.getManagedLedgerOffloadDriver();
        this.storageBasePath = this.configuration.get("hadoop.tmp.dir");
        this.scheduler = scheduler;
        this.fileSystem = FileSystem.get((Configuration)this.configuration);
        this.assignmentScheduler = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().numThreads(conf.getManagedLedgerOffloadMaxThreads().intValue()).name("offload-assignment").build();
        this.offloaderStats = offloaderStats;
    }

    @VisibleForTesting
    public FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, OrderedScheduler scheduler, String testHDFSPath, String baseDir, LedgerOffloaderStats offloaderStats) throws IOException {
        this.offloadPolicies = conf;
        this.configuration = new Configuration();
        this.configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        this.configuration.set("fs.defaultFS", testHDFSPath);
        this.configuration.setClassLoader(FileSystemLedgerOffloaderFactory.class.getClassLoader());
        this.driverName = conf.getManagedLedgerOffloadDriver();
        this.configuration.set("hadoop.tmp.dir", baseDir);
        this.storageBasePath = baseDir;
        this.scheduler = scheduler;
        this.fileSystem = FileSystem.get((Configuration)this.configuration);
        this.assignmentScheduler = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().numThreads(conf.getManagedLedgerOffloadMaxThreads().intValue()).name("offload-assignment").build();
        this.offloaderStats = offloaderStats;
    }

    public Map<String, String> getOffloadDriverMetadata() {
        String path = this.storageBasePath == null ? "null" : this.storageBasePath;
        return ImmutableMap.of((Object)STORAGE_BASE_PATH, (Object)path);
    }

    public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata) {
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        this.scheduler.chooseThread(readHandle.getId()).execute((Runnable)new LedgerReader(readHandle, uuid, extraMetadata, promise, this.storageBasePath, this.configuration, this.assignmentScheduler, this.offloadPolicies.getManagedLedgerOffloadPrefetchRounds(), this.offloaderStats));
        return promise;
    }

    public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid, Map<String, String> offloadDriverMetadata) {
        String ledgerName = offloadDriverMetadata.get(MANAGED_LEDGER_NAME);
        CompletableFuture<ReadHandle> promise = new CompletableFuture<ReadHandle>();
        String storagePath = FileSystemManagedLedgerOffloader.getStoragePath(this.storageBasePath, ledgerName);
        String dataFilePath = FileSystemManagedLedgerOffloader.getDataFilePath(storagePath, ledgerId, uuid);
        this.scheduler.chooseThread(ledgerId).execute(() -> {
            try {
                MapFile.Reader reader = new MapFile.Reader(new Path(dataFilePath), this.configuration, new SequenceFile.Reader.Option[0]);
                promise.complete(FileStoreBackedReadHandleImpl.open((ScheduledExecutorService)this.scheduler.chooseThread(ledgerId), reader, ledgerId, this.offloaderStats, ledgerName));
            }
            catch (Throwable t) {
                log.error("Failed to open FileStoreBackedReadHandleImpl: ManagerLedgerName: {}, LegerId: {}, UUID: {}", new Object[]{ledgerName, ledgerId, uuid, t});
                promise.completeExceptionally(t);
            }
        });
        return promise;
    }

    private static String getStoragePath(String storageBasePath, String managedLedgerName) {
        return storageBasePath == null ? managedLedgerName + "/" : storageBasePath + "/" + managedLedgerName + "/";
    }

    private static String getDataFilePath(String storagePath, long ledgerId, UUID uuid) {
        return storagePath + ledgerId + "-" + uuid.toString();
    }

    public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
        String ledgerName = offloadDriverMetadata.get(MANAGED_LEDGER_NAME);
        String storagePath = FileSystemManagedLedgerOffloader.getStoragePath(this.storageBasePath, ledgerName);
        String dataFilePath = FileSystemManagedLedgerOffloader.getDataFilePath(storagePath, ledgerId, uid);
        String topicName = TopicName.fromPersistenceNamingEncoding((String)ledgerName);
        CompletableFuture promise = new CompletableFuture();
        try {
            this.fileSystem.delete(new Path(dataFilePath), true);
            promise.complete(null);
        }
        catch (IOException e) {
            log.error("Failed to delete Offloaded: ", (Throwable)e);
            promise.completeExceptionally(e);
        }
        return promise.whenComplete((__, t) -> this.offloaderStats.recordDeleteOffloadOps(topicName, t == null));
    }

    public OffloadPoliciesImpl getOffloadPolicies() {
        return this.offloadPolicies;
    }

    public void close() {
        if (this.fileSystem != null) {
            try {
                this.fileSystem.close();
            }
            catch (Exception e) {
                log.error("FileSystemManagedLedgerOffloader close failed!", (Throwable)e);
            }
        }
    }

    private static class LedgerReader
    implements Runnable {
        private final ReadHandle readHandle;
        private final UUID uuid;
        private final Map<String, String> extraMetadata;
        private final CompletableFuture<Void> promise;
        private final String storageBasePath;
        private final Configuration configuration;
        volatile Exception fileSystemWriteException = null;
        private OrderedScheduler assignmentScheduler;
        private int managedLedgerOffloadPrefetchRounds = 1;
        private final LedgerOffloaderStats offloaderStats;

        private LedgerReader(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata, CompletableFuture<Void> promise, String storageBasePath, Configuration configuration, OrderedScheduler assignmentScheduler, int managedLedgerOffloadPrefetchRounds, LedgerOffloaderStats offloaderStats) {
            this.readHandle = readHandle;
            this.uuid = uuid;
            this.extraMetadata = extraMetadata;
            this.promise = promise;
            this.storageBasePath = storageBasePath;
            this.configuration = configuration;
            this.assignmentScheduler = assignmentScheduler;
            this.managedLedgerOffloadPrefetchRounds = managedLedgerOffloadPrefetchRounds;
            this.offloaderStats = offloaderStats;
        }

        @Override
        public void run() {
            if (this.readHandle.getLength() == 0L || !this.readHandle.isClosed() || this.readHandle.getLastAddConfirmed() < 0L) {
                this.promise.completeExceptionally(new IllegalArgumentException("An empty or open ledger should never be offloaded"));
                return;
            }
            long ledgerId = this.readHandle.getId();
            String managedLedgerName = this.extraMetadata.get(FileSystemManagedLedgerOffloader.MANAGED_LEDGER_NAME);
            String storagePath = FileSystemManagedLedgerOffloader.getStoragePath(this.storageBasePath, managedLedgerName);
            String dataFilePath = FileSystemManagedLedgerOffloader.getDataFilePath(storagePath, ledgerId, this.uuid);
            String topicName = TopicName.fromPersistenceNamingEncoding((String)managedLedgerName);
            LongWritable key = new LongWritable();
            BytesWritable value = new BytesWritable();
            try {
                CountDownLatch countDownLatch;
                long end;
                MapFile.Writer dataWriter = new MapFile.Writer(this.configuration, new Path(dataFilePath), new SequenceFile.Writer.Option[]{MapFile.Writer.keyClass(LongWritable.class), MapFile.Writer.valueClass(BytesWritable.class)});
                key.set(-1L);
                byte[] ledgerMetadata = OffloadUtils.buildLedgerMetadataFormat((LedgerMetadata)this.readHandle.getLedgerMetadata());
                value.set(ledgerMetadata, 0, ledgerMetadata.length);
                dataWriter.append((WritableComparable)key, (Writable)value);
                AtomicLong haveOffloadEntryNumber = new AtomicLong(0L);
                long needToOffloadFirstEntryNumber = 0L;
                Semaphore semaphore = new Semaphore(this.managedLedgerOffloadPrefetchRounds);
                do {
                    end = Math.min(needToOffloadFirstEntryNumber + 100L - 1L, this.readHandle.getLastAddConfirmed());
                    log.debug("read ledger entries. start: {}, end: {}", (Object)needToOffloadFirstEntryNumber, (Object)end);
                    long startReadTime = System.nanoTime();
                    LedgerEntries ledgerEntriesOnce = (LedgerEntries)this.readHandle.readAsync(needToOffloadFirstEntryNumber, end).get();
                    long cost = System.nanoTime() - startReadTime;
                    this.offloaderStats.recordReadLedgerLatency(topicName, cost, TimeUnit.NANOSECONDS);
                    semaphore.acquire();
                    countDownLatch = new CountDownLatch(1);
                    this.assignmentScheduler.chooseThread(ledgerId).execute((Runnable)FileSystemWriter.create(ledgerEntriesOnce, dataWriter, semaphore, countDownLatch, haveOffloadEntryNumber, this));
                } while ((needToOffloadFirstEntryNumber = end + 1L) - 1L != this.readHandle.getLastAddConfirmed() && this.fileSystemWriteException == null);
                countDownLatch.await();
                if (this.fileSystemWriteException != null) {
                    throw this.fileSystemWriteException;
                }
                IOUtils.closeStream((Closeable)dataWriter);
                this.promise.complete(null);
            }
            catch (Exception e) {
                log.error("Exception when get CompletableFuture<LedgerEntries> : ManagerLedgerName: {}, LedgerId: {}, UUID: {} ", new Object[]{managedLedgerName, ledgerId, this.uuid, e});
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                this.offloaderStats.recordOffloadError(topicName);
                this.promise.completeExceptionally(e);
            }
        }
    }

    private static class FileSystemWriter
    implements Runnable {
        private LedgerEntries ledgerEntriesOnce;
        private final LongWritable key = new LongWritable();
        private final BytesWritable value = new BytesWritable();
        private MapFile.Writer dataWriter;
        private CountDownLatch countDownLatch;
        private AtomicLong haveOffloadEntryNumber;
        private LedgerReader ledgerReader;
        private Semaphore semaphore;
        private Recycler.Handle<FileSystemWriter> recyclerHandle;
        private static final Recycler<FileSystemWriter> RECYCLER = new Recycler<FileSystemWriter>(){

            protected FileSystemWriter newObject(Recycler.Handle<FileSystemWriter> handle) {
                return new FileSystemWriter(handle);
            }
        };

        private FileSystemWriter(Recycler.Handle<FileSystemWriter> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }

        private void recycle() {
            this.dataWriter = null;
            this.countDownLatch = null;
            this.haveOffloadEntryNumber = null;
            this.ledgerReader = null;
            this.ledgerEntriesOnce = null;
            this.semaphore = null;
            this.recyclerHandle.recycle((Object)this);
        }

        public static FileSystemWriter create(LedgerEntries ledgerEntriesOnce, MapFile.Writer dataWriter, Semaphore semaphore, CountDownLatch countDownLatch, AtomicLong haveOffloadEntryNumber, LedgerReader ledgerReader) {
            FileSystemWriter writer = (FileSystemWriter)RECYCLER.get();
            writer.ledgerReader = ledgerReader;
            writer.dataWriter = dataWriter;
            writer.countDownLatch = countDownLatch;
            writer.haveOffloadEntryNumber = haveOffloadEntryNumber;
            writer.ledgerEntriesOnce = ledgerEntriesOnce;
            writer.semaphore = semaphore;
            return writer;
        }

        @Override
        public void run() {
            String managedLedgerName = this.ledgerReader.extraMetadata.get(FileSystemManagedLedgerOffloader.MANAGED_LEDGER_NAME);
            String topicName = TopicName.fromPersistenceNamingEncoding((String)managedLedgerName);
            if (this.ledgerReader.fileSystemWriteException == null) {
                for (LedgerEntry entry : this.ledgerEntriesOnce) {
                    int currentEntrySize;
                    long entryId = entry.getEntryId();
                    this.key.set(entryId);
                    try {
                        byte[] currentEntryBytes = entry.getEntryBytes();
                        currentEntrySize = currentEntryBytes.length;
                        this.value.set(currentEntryBytes, 0, currentEntrySize);
                        this.dataWriter.append((WritableComparable)this.key, (Writable)this.value);
                    }
                    catch (IOException e) {
                        this.ledgerReader.fileSystemWriteException = e;
                        this.ledgerReader.offloaderStats.recordWriteToStorageError(topicName);
                        break;
                    }
                    this.haveOffloadEntryNumber.incrementAndGet();
                    this.ledgerReader.offloaderStats.recordOffloadBytes(topicName, (long)currentEntrySize);
                }
            }
            this.countDownLatch.countDown();
            this.ledgerEntriesOnce.close();
            this.semaphore.release();
            this.recycle();
        }
    }
}

