/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.util;

import java.io.EOFException;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.util.BatchedWriteAheadLog;
import org.apache.spark.streaming.util.FileBasedWriteAheadLog;
import org.apache.spark.streaming.util.FileBasedWriteAheadLogReader;
import org.apache.spark.streaming.util.FileBasedWriteAheadLogSegment;
import org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter;
import org.apache.spark.streaming.util.HdfsUtils$;
import org.apache.spark.streaming.util.WriteAheadLog;
import org.apache.spark.streaming.util.WriteAheadLogRecordHandle;
import org.apache.spark.util.ManualClock;
import org.apache.spark.util.Utils$;
import scala.Array$;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayOps;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

public final class WriteAheadLogSuite$ {
    public static final WriteAheadLogSuite$ MODULE$;
    private final Configuration org$apache$spark$streaming$util$WriteAheadLogSuite$$hadoopConf;

    static {
        new WriteAheadLogSuite$();
    }

    public Configuration org$apache$spark$streaming$util$WriteAheadLogSuite$$hadoopConf() {
        return this.org$apache$spark$streaming$util$WriteAheadLogSuite$$hadoopConf;
    }

    public Seq<FileBasedWriteAheadLogSegment> writeDataManually(Seq<String> data, String file, boolean allowBatching) {
        ArrayBuffer segments = new ArrayBuffer();
        FSDataOutputStream writer = HdfsUtils$.MODULE$.getOutputStream(file, this.org$apache$spark$streaming$util$WriteAheadLogSuite$$hadoopConf());
        if (allowBatching) {
            this.org$apache$spark$streaming$util$WriteAheadLogSuite$$writeToStream$1(this.wrapArrayArrayByte(data.toArray(ClassTag$.MODULE$.apply(String.class))).array(), file, segments, writer);
        } else {
            data.foreach((Function1)new Serializable(file, segments, writer){
                public static final long serialVersionUID = 0L;
                private final String file$1;
                private final ArrayBuffer segments$1;
                private final FSDataOutputStream writer$2;

                public final void apply(String item) {
                    WriteAheadLogSuite$.MODULE$.org$apache$spark$streaming$util$WriteAheadLogSuite$$writeToStream$1(Utils$.MODULE$.serialize((Object)item), this.file$1, this.segments$1, this.writer$2);
                }
                {
                    this.file$1 = file$1;
                    this.segments$1 = segments$1;
                    this.writer$2 = writer$2;
                }
            });
        }
        writer.close();
        return segments;
    }

    public Seq<FileBasedWriteAheadLogSegment> writeDataUsingWriter(String filePath, Seq<String> data) {
        FileBasedWriteAheadLogWriter writer = new FileBasedWriteAheadLogWriter(filePath, this.org$apache$spark$streaming$util$WriteAheadLogSuite$$hadoopConf());
        Seq segments = (Seq)data.map((Function1)new Serializable(writer){
            public static final long serialVersionUID = 0L;
            private final FileBasedWriteAheadLogWriter writer$3;

            public final FileBasedWriteAheadLogSegment apply(String item) {
                return this.writer$3.write(WriteAheadLogSuite$.MODULE$.stringToByteBuffer(item));
            }
            {
                this.writer$3 = writer$3;
            }
        }, Seq$.MODULE$.canBuildFrom());
        writer.close();
        return segments;
    }

    public WriteAheadLog writeDataUsingWriteAheadLog(String logDirectory, Seq<String> data, boolean closeFileAfterWrite, boolean allowBatching, ManualClock manualClock, boolean closeLog, int clockAdvanceTime) {
        if (manualClock.getTimeMillis() < 100000L) {
            manualClock.setTime(10000L);
        }
        WriteAheadLog wal = this.createWriteAheadLog(logDirectory, closeFileAfterWrite, allowBatching);
        data.foreach((Function1)new Serializable(manualClock, clockAdvanceTime, wal){
            public static final long serialVersionUID = 0L;
            private final ManualClock manualClock$1;
            private final int clockAdvanceTime$1;
            private final WriteAheadLog wal$3;

            public final WriteAheadLogRecordHandle apply(String item) {
                this.manualClock$1.advance((long)this.clockAdvanceTime$1);
                return this.wal$3.write(WriteAheadLogSuite$.MODULE$.stringToByteBuffer(item), this.manualClock$1.getTimeMillis());
            }
            {
                this.manualClock$1 = manualClock$1;
                this.clockAdvanceTime$1 = clockAdvanceTime$1;
                this.wal$3 = wal$3;
            }
        });
        if (closeLog) {
            wal.close();
        }
        return wal;
    }

    public ManualClock writeDataUsingWriteAheadLog$default$5() {
        return new ManualClock();
    }

    public boolean writeDataUsingWriteAheadLog$default$6() {
        return true;
    }

    public int writeDataUsingWriteAheadLog$default$7() {
        return 500;
    }

    public Seq<String> readDataManually(Seq<FileBasedWriteAheadLogSegment> segments) {
        return (Seq)segments.map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            /*
             * WARNING - void declaration
             */
            public final String apply(FileBasedWriteAheadLogSegment segment) {
                String string;
                FSDataInputStream reader = HdfsUtils$.MODULE$.getInputStream(segment.path(), WriteAheadLogSuite$.MODULE$.org$apache$spark$streaming$util$WriteAheadLogSuite$$hadoopConf());
                try {
                    reader.seek(segment.offset());
                    byte[] bytes = new byte[segment.length()];
                    reader.readInt();
                    reader.readFully(bytes);
                    String data = (String)Utils$.MODULE$.deserialize(bytes);
                    reader.close();
                    string = data;
                }
                catch (Throwable throwable) {
                    void var2_2;
                    var2_2.close();
                    throw throwable;
                }
                reader.close();
                return string;
            }
        }, Seq$.MODULE$.canBuildFrom());
    }

    /*
     * WARNING - void declaration
     */
    public <T> Seq<T> readDataManually(String file) {
        void var3_3;
        FSDataInputStream reader = HdfsUtils$.MODULE$.getInputStream(file, this.org$apache$spark$streaming$util$WriteAheadLogSuite$$hadoopConf());
        ArrayBuffer buffer = new ArrayBuffer();
        try {
            try {
                while (true) {
                    int length = reader.readInt();
                    byte[] bytes = new byte[length];
                    reader.read(bytes);
                    buffer.$plus$eq(Utils$.MODULE$.deserialize(bytes));
                }
            }
            catch (EOFException eOFException) {
            }
        }
        finally {
            reader.close();
        }
        return var3_3;
    }

    /*
     * WARNING - void declaration
     */
    public Seq<String> readDataUsingReader(String file) {
        void var3_3;
        FileBasedWriteAheadLogReader reader = new FileBasedWriteAheadLogReader(file, this.org$apache$spark$streaming$util$WriteAheadLogSuite$$hadoopConf());
        List readData = (List)reader.toList().map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply(ByteBuffer byteBuffer) {
                return WriteAheadLogSuite$.MODULE$.byteBufferToString(byteBuffer);
            }
        }, List$.MODULE$.canBuildFrom());
        reader.close();
        return var3_3;
    }

    public Seq<String> readDataUsingWriteAheadLog(String logDirectory, boolean closeFileAfterWrite, boolean allowBatching) {
        WriteAheadLog wal = this.createWriteAheadLog(logDirectory, closeFileAfterWrite, allowBatching);
        String[] data = (String[])((Iterator)JavaConverters$.MODULE$.asScalaIteratorConverter(wal.readAll()).asScala()).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply(ByteBuffer byteBuffer) {
                return WriteAheadLogSuite$.MODULE$.byteBufferToString(byteBuffer);
            }
        }).toArray(ClassTag$.MODULE$.apply(String.class));
        wal.close();
        return Predef$.MODULE$.wrapRefArray((Object[])data);
    }

    public Seq<String> getLogFilesInDirectory(String directory) {
        Path logDirectoryPath = new Path(directory);
        FileSystem fileSystem = HdfsUtils$.MODULE$.getFileSystemForPath(logDirectoryPath, this.org$apache$spark$streaming$util$WriteAheadLogSuite$$hadoopConf());
        return fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDirectory() ? (Seq)Predef$.MODULE$.refArrayOps((Object[])Predef$.MODULE$.refArrayOps((Object[])Predef$.MODULE$.refArrayOps((Object[])fileSystem.listStatus(logDirectoryPath)).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Path apply(FileStatus x$4) {
                return x$4.getPath();
            }
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Path.class)))).sortBy((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final long apply(Path x$5) {
                return new StringOps(Predef$.MODULE$.augmentString(x$5.getName().split("-")[1])).toLong();
            }
        }, (Ordering)Ordering.Long$.MODULE$)).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply(Path x$6) {
                return new StringOps(Predef$.MODULE$.augmentString(x$6.toString())).stripPrefix("file:");
            }
        }, Array$.MODULE$.fallbackCanBuildFrom(Predef.DummyImplicit$.MODULE$.dummyImplicit())) : (Seq)Seq$.MODULE$.empty();
    }

    public WriteAheadLog createWriteAheadLog(String logDirectory, boolean closeFileAfterWrite, boolean allowBatching) {
        SparkConf sparkConf = new SparkConf();
        FileBasedWriteAheadLog wal = new FileBasedWriteAheadLog(sparkConf, logDirectory, this.org$apache$spark$streaming$util$WriteAheadLogSuite$$hadoopConf(), 1, 1, closeFileAfterWrite);
        return allowBatching ? new BatchedWriteAheadLog((WriteAheadLog)wal, sparkConf) : wal;
    }

    public Seq<String> generateRandomData() {
        return (Seq)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 100).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply(int x$7) {
                return ((Object)BoxesRunTime.boxToInteger((int)x$7)).toString();
            }
        }, IndexedSeq$.MODULE$.canBuildFrom());
    }

    public Seq<String> readAndDeserializeDataManually(Seq<String> logFiles, boolean allowBatching) {
        return allowBatching ? (Seq)logFiles.flatMap((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Seq<String> apply(String file) {
                Seq<T> data = WriteAheadLogSuite$.MODULE$.readDataManually(file);
                return (Seq)data.flatMap((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final ArrayOps<String> apply(byte[][] byteArray) {
                        return Predef$.MODULE$.refArrayOps((Object[])Predef$.MODULE$.refArrayOps((Object[])byteArray).map((Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;

                            public final String apply(byte[] bytes) {
                                return (String)Utils$.MODULE$.deserialize(bytes);
                            }
                        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))));
                    }
                }, Seq$.MODULE$.canBuildFrom());
            }
        }, Seq$.MODULE$.canBuildFrom()) : (Seq)logFiles.flatMap((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Seq<String> apply(String file) {
                return WriteAheadLogSuite$.MODULE$.readDataManually(file);
            }
        }, Seq$.MODULE$.canBuildFrom());
    }

    public ByteBuffer stringToByteBuffer(String str) {
        return ByteBuffer.wrap(Utils$.MODULE$.serialize((Object)str));
    }

    public String byteBufferToString(ByteBuffer byteBuffer) {
        return (String)Utils$.MODULE$.deserialize(byteBuffer.array());
    }

    public <T> ByteBuffer wrapArrayArrayByte(Object records) {
        return ByteBuffer.wrap(Utils$.MODULE$.serialize(Predef$.MODULE$.genericArrayOps(records).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final byte[] apply(T o) {
                return Utils$.MODULE$.serialize(o);
            }
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Byte.TYPE))))));
    }

    public final void org$apache$spark$streaming$util$WriteAheadLogSuite$$writeToStream$1(byte[] bytes, String file$1, ArrayBuffer segments$1, FSDataOutputStream writer$2) {
        long offset = writer$2.getPos();
        writer$2.writeInt(Predef$.MODULE$.byteArrayOps(bytes).size());
        writer$2.write(bytes);
        segments$1.$plus$eq((Object)new FileBasedWriteAheadLogSegment(file$1, offset, Predef$.MODULE$.byteArrayOps(bytes).size()));
    }

    private WriteAheadLogSuite$() {
        MODULE$ = this;
        this.org$apache$spark$streaming$util$WriteAheadLogSuite$$hadoopConf = new Configuration();
    }
}

