/*
 * Decompiled with CFR 0.152.
 */
package com.exasol.bucketfs;

import com.exasol.bucketfs.Bucket;
import com.exasol.bucketfs.BucketAccessException;
import com.exasol.bucketfs.UploadResult;
import com.exasol.bucketfs.WriteEnabledBucket;
import com.exasol.bucketfs.monitor.BucketFsMonitor;
import com.exasol.errorreporting.ExaError;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.nio.file.Path;
import java.time.Instant;
import java.time.temporal.ChronoField;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.logging.Logger;

public class SyncAwareBucket
extends WriteEnabledBucket
implements Bucket {
    private static final Logger LOGGER = Logger.getLogger(SyncAwareBucket.class.getName());
    private static final long BUCKET_SYNC_TIMEOUT_IN_MILLISECONDS = 120000L;
    private static final long FILE_SYNC_POLLING_DELAY_IN_MILLISECONDS = 200L;
    private final BucketFsMonitor monitor;
    private final BucketFsMonitor.StateRetriever stateRetriever;

    protected SyncAwareBucket(Builder<? extends Builder<?>> builder) {
        super((WriteEnabledBucket.Builder<? extends WriteEnabledBucket.Builder<?>>)builder);
        Objects.requireNonNull(builder.monitor);
        Objects.requireNonNull(builder.stateRetriever);
        this.monitor = builder.monitor;
        this.stateRetriever = builder.stateRetriever;
    }

    @Override
    public boolean isObjectSynchronized(String pathInBucket, BucketFsMonitor.State state) throws BucketAccessException {
        return this.monitor.isObjectSynchronized(this, pathInBucket, state);
    }

    @Override
    public void uploadFile(Path localPath, String pathInBucket) throws TimeoutException, BucketAccessException, FileNotFoundException {
        this.delayRepeatedUploadToSamePath(pathInBucket);
        BucketFsMonitor.State state = this.stateRetriever.getState();
        UploadResult uploadResult = this.uploadFileNonBlocking(localPath, pathInBucket);
        if (uploadResult.wasUploadNecessary()) {
            this.waitForFileToBeSynchronized(pathInBucket, state);
            this.recordUploadInHistory(pathInBucket);
        }
    }

    private void delayRepeatedUploadToSamePath(String extendedPathInBucket) throws BucketAccessException {
        if (this.uploadHistory.containsKey(extendedPathInBucket)) {
            Instant lastUploadAt = ((Instant)this.uploadHistory.get(extendedPathInBucket)).with(ChronoField.NANO_OF_SECOND, 0L);
            Instant now = Instant.now();
            if (now.isAfter(lastUploadAt.plusSeconds(1L))) {
                LOGGER.finest(() -> "Last upload to '" + extendedPathInBucket + "' was at " + String.valueOf(lastUploadAt) + ". No need to add extra delay.");
            } else {
                long delayInMillis = 1000L - (long)now.getNano() / 1000000L;
                LOGGER.fine(() -> "Delaying upload to '" + extendedPathInBucket + "' for " + delayInMillis + " ms");
                try {
                    Thread.sleep(delayInMillis);
                }
                catch (InterruptedException exception) {
                    Thread.currentThread().interrupt();
                    throw new BucketAccessException(ExaError.messageBuilder((String)"E-BFSJ-8").message("Interrupted while delaying repeated upload to {{path}}", new Object[]{extendedPathInBucket}).toString());
                }
            }
        } else {
            LOGGER.finest(() -> "No previous uploads to '" + extendedPathInBucket + "' recorded in upload history. No upload delay required.");
        }
    }

    @Override
    public void uploadStringContent(String content, String pathInBucket) throws InterruptedException, BucketAccessException, TimeoutException {
        this.delayRepeatedUploadToSamePath(pathInBucket);
        BucketFsMonitor.State state = this.stateRetriever.getState();
        this.uploadStringContentNonBlocking(content, pathInBucket);
        this.waitForFileToBeSynchronized(pathInBucket, state);
        this.recordUploadInHistory(pathInBucket);
    }

    @Override
    public void uploadInputStream(Supplier<InputStream> inputStreamSupplier, String pathInBucket) throws BucketAccessException, TimeoutException {
        this.delayRepeatedUploadToSamePath(pathInBucket);
        BucketFsMonitor.State state = this.stateRetriever.getState();
        this.uploadInputStreamNonBlocking(inputStreamSupplier, pathInBucket);
        this.waitForFileToBeSynchronized(pathInBucket, state);
        this.recordUploadInHistory(pathInBucket);
    }

    private void waitForFileToBeSynchronized(String pathInBucket, BucketFsMonitor.State state) throws TimeoutException, BucketAccessException {
        long expiry = System.currentTimeMillis() + 120000L;
        while (System.currentTimeMillis() < expiry) {
            if (this.monitor.isObjectSynchronized(this, pathInBucket, state)) {
                return;
            }
            try {
                Thread.sleep(200L);
            }
            catch (InterruptedException exception) {
                Thread.currentThread().interrupt();
                throw new BucketAccessException(ExaError.messageBuilder((String)"E-BFSJ-10").message("Interrupted while waiting for {{path}} to be synchronized on BucketFS.", new Object[]{pathInBucket}).toString());
            }
        }
        String message = String.format("Timeout waiting for object '%s' to be synchronized in bucket '%s' after %s.", pathInBucket, this.getFullyQualifiedBucketName(), state.toString());
        LOGGER.severe(() -> message);
        throw new TimeoutException(message);
    }

    public static Builder<? extends Builder<?>> builder() {
        return new Builder();
    }

    public static class Builder<T extends Builder<T>>
    extends WriteEnabledBucket.Builder<Builder<T>> {
        private BucketFsMonitor monitor;
        private BucketFsMonitor.StateRetriever stateRetriever;

        @Override
        protected T self() {
            return (T)this;
        }

        public T monitor(BucketFsMonitor value) {
            this.monitor = value;
            return (T)this.self();
        }

        public T stateRetriever(BucketFsMonitor.StateRetriever value) {
            this.stateRetriever = value;
            return (T)this.self();
        }

        @Override
        public SyncAwareBucket build() {
            return new SyncAwareBucket(this);
        }
    }
}

