/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.system.hdfs.writer;

import java.io.Closeable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.DirectDecompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.hdfs.HdfsConfig;
import org.apache.samza.system.hdfs.writer.Bucketer;
import org.apache.samza.system.hdfs.writer.Bucketer$;
import org.apache.samza.system.hdfs.writer.HdfsWriter;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Serializable;
import scala.Some;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005%d!B\u0001\u0003\u0003\u0003y!AF*fcV,gnY3GS2,\u0007\n\u001a4t/JLG/\u001a:\u000b\u0005\r!\u0011AB<sSR,'O\u0003\u0002\u0006\r\u0005!\u0001\u000e\u001a4t\u0015\t9\u0001\"\u0001\u0004tsN$X-\u001c\u0006\u0003\u0013)\tQa]1nu\u0006T!a\u0003\u0007\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005i\u0011aA8sO\u000e\u00011C\u0001\u0001\u0011!\r\t\"\u0003F\u0007\u0002\u0005%\u00111C\u0001\u0002\u000b\u0011\u001247o\u0016:ji\u0016\u0014\bCA\u000b\u001e\u001d\t12$D\u0001\u0018\u0015\tA\u0012$\u0001\u0002j_*\u0011!DC\u0001\u0007Q\u0006$wn\u001c9\n\u0005q9\u0012\u0001D*fcV,gnY3GS2,\u0017B\u0001\u0010 \u0005\u00199&/\u001b;fe*\u0011Ad\u0006\u0005\tC\u0001\u0011\t\u0011)A\u0005E\u0005\u0019AMZ:\u0011\u0005\r2S\"\u0001\u0013\u000b\u0005\u0015J\u0012A\u00014t\u0013\t9CE\u0001\u0006GS2,7+_:uK6D\u0001\"\u000b\u0001\u0003\u0002\u0003\u0006IAK\u0001\u000bgf\u001cH/Z7OC6,\u0007CA\u00162\u001d\tas&D\u0001.\u0015\u0005q\u0013!B:dC2\f\u0017B\u0001\u0019.\u0003\u0019\u0001&/\u001a3fM&\u0011!g\r\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005Aj\u0003\u0002C\u001b\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001c\u0002\r\r|gNZ5h!\t9\u0004(D\u0001\u0005\u0013\tIDA\u0001\u0006II\u001a\u001c8i\u001c8gS\u001eDQa\u000f\u0001\u0005\u0002q\na\u0001P5oSRtD\u0003B\u001f?\u007f\u0001\u0003\"!\u0005\u0001\t\u000b\u0005R\u0004\u0019\u0001\u0012\t\u000b%R\u0004\u0019\u0001\u0016\t\u000bUR\u0004\u0019\u0001\u001c\t\u000f\t\u0003!\u0019!C\u0001\u0007\u0006I!-\u0019;dQNK'0Z\u000b\u0002\tB\u0011A&R\u0005\u0003\r6\u0012A\u0001T8oO\"1\u0001\n\u0001Q\u0001\n\u0011\u000b!BY1uG\"\u001c\u0016N_3!\u0011\u001dQ\u0005A1A\u0005\u0002-\u000b\u0001BY;dW\u0016$XM]\u000b\u0002\u0019B\u0019A&T(\n\u00059k#\u0001B*p[\u0016\u0004\"!\u0005)\n\u0005E\u0013!\u0001\u0003\"vG.,G/\u001a:\t\rM\u0003\u0001\u0015!\u0003M\u0003%\u0011WoY6fi\u0016\u0014\b\u0005C\u0004V\u0001\u0001\u0007I\u0011A\"\u0002\u0019\tLH/Z:Xe&$H/\u001a8\t\u000f]\u0003\u0001\u0019!C\u00011\u0006\u0001\"-\u001f;fg^\u0013\u0018\u000e\u001e;f]~#S-\u001d\u000b\u00033r\u0003\"\u0001\f.\n\u0005mk#\u0001B+oSRDq!\u0018,\u0002\u0002\u0003\u0007A)A\u0002yIEBaa\u0018\u0001!B\u0013!\u0015!\u00042zi\u0016\u001cxK]5ui\u0016t\u0007\u0005C\u0003b\u0001\u0019\u0005!-\u0001\u0004hKR\\U-_\u000b\u0002GB\u0011a\u0003Z\u0005\u0003K^\u0011\u0001b\u0016:ji\u0006\u0014G.\u001a\u0005\u0006O\u00021\t\u0001[\u0001\tO\u0016$h+\u00197vKR\u00111-\u001b\u0005\u0006U\u001a\u0004\ra[\u0001\t_V$xm\\5oOB\u0011A.\\\u0007\u0002\r%\u0011aN\u0002\u0002\u0018\u001fV$xm\\5oO6+7o]1hK\u0016sg/\u001a7pa\u0016DQ\u0001\u001d\u0001\u0007\u0002E\fAcZ3u\u001fV$\b/\u001e;TSj,\u0017J\u001c\"zi\u0016\u001cHC\u0001#s\u0011\u0015\u0019x\u000e1\u0001d\u0003!9(/\u001b;bE2,\u0007\"B;\u0001\r\u00031\u0018\u0001C6fs\u000ec\u0017m]:\u0016\u0003]\u0004$\u0001_?\u0011\u0007-J80\u0003\u0002{g\t)1\t\\1tgB\u0011A0 \u0007\u0001\t%qH/!A\u0001\u0002\u000b\u0005qPA\u0002`IE\n2!!\u0001d!\ra\u00131A\u0005\u0004\u0003\u000bi#a\u0002(pi\"Lgn\u001a\u0005\b\u0003\u0013\u0001a\u0011AA\u0006\u0003)1\u0018\r\\;f\u00072\f7o]\u000b\u0003\u0003\u001b\u0001D!a\u0004\u0002\u0014A!1&_A\t!\ra\u00181\u0003\u0003\f\u0003+\t9!!A\u0001\u0002\u000b\u0005qPA\u0002`IIBq!!\u0007\u0001\t\u0003\tY\"A\nhKR\u001cu.\u001c9sKN\u001c\u0018n\u001c8D_\u0012,7\r\u0006\u0003\u0002\u001e\u0005m\"CBA\u0010\u0003O\tyC\u0002\u0004\u0002\"\u0001\u0001\u0011Q\u0004\u0002\ryI,g-\u001b8f[\u0016tGO\u0010\u0006\u0004\u0003K9\u0012\u0001C2p[B\u0014Xm]:\u0011\t\u0005%\u00121F\u0007\u0003\u0003GIA!!\f\u0002$\tAB)\u001b:fGR$UmY8naJ,7o]5p]\u000e{G-Z2\u0011\t\u0005E\u0012qG\u0007\u0003\u0003gQ1!!\u000e\u001a\u0003\u0011\u0019wN\u001c4\n\t\u0005e\u00121\u0007\u0002\r\u0007>tg-[4ve\u0006\u0014G.\u001a\u0005\b\u0003{\t9\u00021\u0001+\u0003=\u0019w.\u001c9sKN\u001c\u0018n\u001c8UsB,\u0007bBA!\u0001\u0011\u0005\u00131I\u0001\u0006M2,8\u000f[\u000b\u00023\"9\u0011q\t\u0001\u0005B\u0005%\u0013!B<sSR,GcA-\u0002L!1!.!\u0012A\u0002-Dq!a\u0014\u0001\t\u0003\n\u0019%A\u0003dY>\u001cX\rC\u0004\u0002T\u0001!\t\"!\u0016\u00021MDw.\u001e7e'R\f'\u000f\u001e(fo>+H\u000f];u\r&dW-\u0006\u0002\u0002XA\u0019A&!\u0017\n\u0007\u0005mSFA\u0004C_>dW-\u00198\t\u000f\u0005}\u0003\u0001\"\u0005\u0002b\u0005iq-\u001a;OKb$xK]5uKJ,\"!a\u0019\u0011\t1\n)\u0007F\u0005\u0004\u0003Oj#AB(qi&|g\u000e")
public abstract class SequenceFileHdfsWriter
extends HdfsWriter<SequenceFile.Writer> {
    private final FileSystem dfs;
    private final String systemName;
    private final HdfsConfig config;
    private final long batchSize;
    private final Some<Bucketer> bucketer;
    private long bytesWritten;

    public long batchSize() {
        return this.batchSize;
    }

    public Some<Bucketer> bucketer() {
        return this.bucketer;
    }

    public long bytesWritten() {
        return this.bytesWritten;
    }

    public void bytesWritten_$eq(long x$1) {
        this.bytesWritten = x$1;
    }

    public abstract Writable getKey();

    public abstract Writable getValue(OutgoingMessageEnvelope var1);

    public abstract long getOutputSizeInBytes(Writable var1);

    public abstract Class<? extends Writable> keyClass();

    public abstract Class<? extends Writable> valueClass();

    public DirectDecompressionCodec getCompressionCodec(String compressionType) {
        String string = compressionType;
        Object object = "snappy".equals(string) ? new SnappyCodec() : ("gzip".equals(string) ? new GzipCodec() : new DefaultCodec());
        return object;
    }

    @Override
    public void flush() {
        this.writer().map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final void apply(SequenceFile.Writer x$1) {
                x$1.hflush();
            }
        });
    }

    @Override
    public void write(OutgoingMessageEnvelope outgoing) {
        if (this.shouldStartNewOutputFile()) {
            this.close();
            this.writer_$eq(this.getNextWriter());
        }
        this.writer().map((Function1)new Serializable(this, outgoing){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ SequenceFileHdfsWriter $outer;
            private final OutgoingMessageEnvelope outgoing$1;

            public final void apply(SequenceFile.Writer seq) {
                Writable writable = this.$outer.getValue(this.outgoing$1);
                this.$outer.bytesWritten_$eq(this.$outer.bytesWritten() + this.$outer.getOutputSizeInBytes(writable));
                seq.append(this.$outer.getKey(), writable);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.outgoing$1 = outgoing$1;
            }
        });
    }

    @Override
    public void close() {
        this.writer().map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final void apply(SequenceFile.Writer w) {
                w.hflush();
                IOUtils.closeStream((Closeable)w);
            }
        });
        this.writer_$eq(None$.MODULE$);
        this.bytesWritten_$eq(0L);
    }

    public boolean shouldStartNewOutputFile() {
        return this.bytesWritten() >= this.batchSize() || ((Bucketer)this.bucketer().get()).shouldChangeBucket();
    }

    public Option<SequenceFile.Writer> getNextWriter() {
        Path path = ((Bucketer)this.bucketer().get()).getNextWritePath(this.dfs);
        return new Some((Object)SequenceFile.createWriter((Configuration)this.dfs.getConf(), (SequenceFile.Writer.Option[])new SequenceFile.Writer.Option[]{SequenceFile.Writer.file((Path)path), SequenceFile.Writer.keyClass(this.keyClass()), SequenceFile.Writer.valueClass(this.valueClass()), SequenceFile.Writer.compression((SequenceFile.CompressionType)SequenceFile.CompressionType.BLOCK, (CompressionCodec)this.getCompressionCodec(this.config.getCompressionType(this.systemName)))}));
    }

    public SequenceFileHdfsWriter(FileSystem dfs, String systemName, HdfsConfig config) {
        this.dfs = dfs;
        this.systemName = systemName;
        this.config = config;
        super(dfs, systemName, config);
        this.batchSize = config.getWriteBatchSizeBytes(systemName);
        this.bucketer = new Some((Object)Bucketer$.MODULE$.getInstance(systemName, config));
        this.bytesWritten = 0L;
    }
}

