/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigtable.beam.sequencefiles;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.DynamicFileDestinations;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.values.KV;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.serializer.Serialization;

class SequenceFileSink<K, V>
extends FileBasedSink<KV<K, V>, Void, KV<K, V>> {
    private static final Log LOG = LogFactory.getLog(SequenceFileSink.class);
    private final Class<K> keyClass;
    private final Class<V> valueClass;
    private final String[] serializationNames;

    SequenceFileSink(ValueProvider<ResourceId> baseOutputDirectoryProvider, FileBasedSink.FilenamePolicy filenamePolicy, Class<K> keyClass, Class<? extends Serialization<? super K>> keySerializationClass, Class<V> valueClass, Class<? extends Serialization<? super V>> valueSerializationClass) {
        super(baseOutputDirectoryProvider, DynamicFileDestinations.constant((FileBasedSink.FilenamePolicy)filenamePolicy), Compression.UNCOMPRESSED);
        this.keyClass = keyClass;
        this.valueClass = valueClass;
        HashSet serializationNameSet = Sets.newHashSet();
        serializationNameSet.add(keySerializationClass.getName());
        serializationNameSet.add(valueSerializationClass.getName());
        this.serializationNames = serializationNameSet.toArray(new String[serializationNameSet.size()]);
    }

    public FileBasedSink.WriteOperation<Void, KV<K, V>> createWriteOperation() {
        return new SeqFileWriteOperation<K, V>(this, this.keyClass, this.valueClass, this.serializationNames);
    }

    static class OutputStreamWrapper
    extends OutputStream {
        private final WritableByteChannel inner;
        private final ByteBuffer singleByteBuffer = ByteBuffer.allocate(1);

        OutputStreamWrapper(WritableByteChannel inner) {
            this.inner = inner;
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            ByteBuffer byteBuffer = ByteBuffer.wrap(b, off, len);
            for (int written = 0; written < len; written += this.inner.write(byteBuffer)) {
                ((Buffer)byteBuffer).position(written + off);
            }
        }

        @Override
        public void write(int b) throws IOException {
            ((Buffer)this.singleByteBuffer).clear();
            this.singleByteBuffer.put((byte)b);
            int written = 0;
            while (written == 0) {
                ((Buffer)this.singleByteBuffer).position(0);
                written = this.inner.write(this.singleByteBuffer);
            }
        }
    }

    private static class SeqFileWriter<K, V>
    extends FileBasedSink.Writer<Void, KV<K, V>> {
        private final SeqFileWriteOperation<K, V> writeOperation;
        private SequenceFile.Writer sequenceFile;
        private final AtomicLong counter = new AtomicLong();

        SeqFileWriter(SeqFileWriteOperation<K, V> writeOperation) {
            super(writeOperation, "application/octet-stream");
            this.writeOperation = writeOperation;
        }

        protected void prepareWrite(WritableByteChannel channel) throws Exception {
            LOG.debug((Object)"Opening new writer");
            Configuration configuration = new Configuration(false);
            configuration.setStrings("io.serializations", ((SeqFileWriteOperation)this.writeOperation).serializationNames);
            FSDataOutputStream outputStream = new FSDataOutputStream((OutputStream)new OutputStreamWrapper(channel), new FileSystem.Statistics("dataflow"));
            this.sequenceFile = SequenceFile.createWriter((Configuration)configuration, (SequenceFile.Writer.Option[])new SequenceFile.Writer.Option[]{SequenceFile.Writer.stream((FSDataOutputStream)outputStream), SequenceFile.Writer.keyClass((Class)((SeqFileWriteOperation)this.writeOperation).keyClass), SequenceFile.Writer.valueClass((Class)((SeqFileWriteOperation)this.writeOperation).valueClass), SequenceFile.Writer.compression((SequenceFile.CompressionType)SequenceFile.CompressionType.BLOCK)});
        }

        protected void finishWrite() throws Exception {
            this.sequenceFile.hflush();
            this.sequenceFile.close();
            super.finishWrite();
            LOG.debug((Object)("Closing writer with " + this.counter.get() + " items"));
        }

        public void write(KV<K, V> value) throws Exception {
            this.counter.incrementAndGet();
            this.sequenceFile.append(value.getKey(), value.getValue());
        }
    }

    private static class SeqFileWriteOperation<K, V>
    extends FileBasedSink.WriteOperation<Void, KV<K, V>> {
        private final Class<K> keyClass;
        private final Class<V> valueClass;
        private final String[] serializationNames;

        SeqFileWriteOperation(SequenceFileSink<K, V> sink, Class<K> keyClass, Class<V> valueClass, String[] serializationNames) {
            super(sink);
            this.keyClass = keyClass;
            this.valueClass = valueClass;
            this.serializationNames = serializationNames;
        }

        public FileBasedSink.Writer<Void, KV<K, V>> createWriter() throws Exception {
            return new SeqFileWriter(this);
        }
    }
}

