/*
 * Decompiled with CFR 0.152.
 */
package org.cojen.tupl.core;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
import java.util.zip.CRC32C;
import java.util.zip.CheckedOutputStream;
import org.cojen.tupl.Database;
import org.cojen.tupl.Snapshot;
import org.cojen.tupl.core.CheckedInputStream;
import org.cojen.tupl.core.RestoreInputStream;
import org.cojen.tupl.diag.EventListener;
import org.cojen.tupl.diag.EventType;
import org.cojen.tupl.io.Utils;
import org.cojen.tupl.repl.SnapshotReceiver;
import org.cojen.tupl.repl.SnapshotSender;
import org.cojen.tupl.repl.StreamReplicator;

final class ReplUtils
extends Utils {
    private static final long RESTORE_EVENT_RATE_MILLIS = 5000L;

    ReplUtils() {
    }

    public static InputStream restoreRequest(StreamReplicator repl, EventListener listener) throws IOException {
        InputStream in;
        Constructor<?> lz4Input;
        Map<String, String> options = new HashMap<String, String>();
        options.put("checksum", "CRC32C");
        try {
            Class<?> clazz = Class.forName("net.jpountz.lz4.LZ4FrameInputStream");
            lz4Input = clazz.getConstructor(InputStream.class);
            options.put("compress", "LZ4Frame");
        }
        catch (Throwable e) {
            lz4Input = null;
        }
        SnapshotReceiver receiver = repl.restore(options);
        if (receiver == null) {
            return null;
        }
        try {
            String checksumOption;
            in = receiver.inputStream();
            options = receiver.options();
            long length = receiver.length();
            String compressOption = options.get("compress");
            if (compressOption != null) {
                if (compressOption.equals("LZ4Frame")) {
                    try {
                        in = (InputStream)lz4Input.newInstance(in);
                    }
                    catch (Throwable e) {
                        throw new IOException("Unable to decompress", e);
                    }
                } else {
                    throw new IOException("Unknown compress option: " + compressOption);
                }
            }
            if ((checksumOption = options.get("checksum")) != null) {
                if (checksumOption.equals("CRC32C")) {
                    in = new CheckedInputStream(in, new CRC32C(), length);
                } else {
                    throw new IOException("Unknown checksum option: " + checksumOption);
                }
            }
            if (listener != null && length >= 0L) {
                RestoreInputStream rin = new RestoreInputStream(in);
                in = rin;
                listener.notify(EventType.REPLICATION_RESTORE, "Receiving snapshot: %1$,d bytes from %2$s", length, receiver.senderAddress());
                new Progress(listener, rin, length).start();
            }
        }
        catch (Throwable e) {
            ReplUtils.closeQuietly(receiver);
            throw e;
        }
        return in;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void sendSnapshot(Database db, SnapshotSender sender) throws IOException {
        Map<String, String> requestedOptions = sender.options();
        HashMap<String, String> options = new HashMap<String, String>();
        CRC32C checksum = null;
        if ("CRC32C".equals(requestedOptions.get("checksum"))) {
            options.put("checksum", "CRC32C");
            checksum = new CRC32C();
        }
        Snapshot snapshot = db.beginSnapshot();
        Constructor<?> lz4Output = null;
        if (snapshot.isCompressible() && "LZ4Frame".equals(requestedOptions.get("compress"))) {
            try {
                Class<?> clazz = Class.forName("net.jpountz.lz4.LZ4FrameOutputStream");
                lz4Output = clazz.getConstructor(OutputStream.class);
                options.put("compress", "LZ4Frame");
            }
            catch (Throwable clazz) {
                // empty catch block
            }
        }
        try (OutputStream out = sender.begin(snapshot.length(), snapshot.position(), options);){
            if (lz4Output != null) {
                try {
                    out = (OutputStream)lz4Output.newInstance(out);
                }
                catch (Throwable e) {
                    throw new IOException("Unable to compress", e);
                }
            }
            CheckedOutputStream cout = null;
            if (checksum != null) {
                cout = new CheckedOutputStream(out, checksum);
                out = cout;
            }
            snapshot.writeTo(out);
            if (cout != null) {
                byte[] buf = new byte[4];
                ReplUtils.encodeIntLE(buf, 0, (int)checksum.getValue());
                out.write(buf);
            }
        }
    }

    private static final class Progress
    extends Thread {
        private final EventListener mListener;
        private final RestoreInputStream mRestore;
        private final long mLength;
        private long mLastTimeMillis = Long.MIN_VALUE;
        private long mLastReceived;

        Progress(EventListener listener, RestoreInputStream in, long length) {
            this.mListener = listener;
            this.mRestore = in;
            this.mLength = length;
            this.setDaemon(true);
        }

        @Override
        public void run() {
            while (!this.mRestore.isFinished()) {
                long now = System.currentTimeMillis();
                long received = this.mRestore.received();
                double percent = 100.0 * ((double)received / (double)this.mLength);
                long progress = received - this.mLastReceived;
                if (this.mLastTimeMillis != Long.MIN_VALUE) {
                    double rate = 1000.0 * ((double)progress / (double)(now - this.mLastTimeMillis));
                    Object format = "Receiving snapshot: %1$1.3f%%";
                    if (rate == 0.0) {
                        this.mListener.notify(EventType.REPLICATION_RESTORE, (String)format, percent);
                    } else {
                        format = (String)format + "  rate: %2$,d bytes/s  remaining: ~%3$s";
                        long remainingSeconds = (long)((double)(this.mLength - received) / rate);
                        this.mListener.notify(EventType.REPLICATION_RESTORE, (String)format, percent, (long)rate, Progress.remainingDuration(remainingSeconds));
                    }
                }
                this.mLastTimeMillis = now;
                this.mLastReceived = received;
                try {
                    Thread.sleep(5000L);
                }
                catch (InterruptedException e) {
                    break;
                }
            }
        }

        private static String remainingDuration(long seconds) {
            if (seconds < 120L) {
                return seconds + "s";
            }
            if (seconds < 7200L) {
                return seconds / 60L + "m";
            }
            if (seconds < 172800L) {
                return seconds / 3600L + "h";
            }
            return seconds / 86400L + "d";
        }
    }
}

