/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.avro;

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.impl.connector.ReadFilesP;
import com.hazelcast.jet.impl.connector.WriteBufferedP;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.security.permission.ConnectorPermission;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.Permission;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;

public final class AvroProcessors {
    private AvroProcessors() {
    }

    @Nonnull
    public static <D, T> ProcessorMetaSupplier readFilesP(@Nonnull String directory, @Nonnull String glob, boolean sharedFileSystem, @Nonnull SupplierEx<? extends DatumReader<D>> datumReaderSupplier, @Nonnull BiFunctionEx<String, ? super D, T> mapOutputFn) {
        FunctionEx<Path, Stream<T>> readFileFn = AvroProcessors.dataFileReadFn(directory, datumReaderSupplier, mapOutputFn);
        return ReadFilesP.metaSupplier((String)directory, (String)glob, (boolean)sharedFileSystem, (boolean)true, readFileFn);
    }

    @Nonnull
    public static <D> ProcessorMetaSupplier writeFilesP(@Nonnull String directoryName, @Nonnull Schema schema, @Nonnull SupplierEx<DatumWriter<D>> datumWriterSupplier) {
        String jsonSchema = schema.toString();
        return ProcessorMetaSupplier.preferLocalParallelismOne((Permission)ConnectorPermission.file((String)directoryName, (String)"write"), (SupplierEx)WriteBufferedP.supplier(AvroProcessors.dataFileWriterFn(directoryName, jsonSchema, datumWriterSupplier), DataFileWriter::append, DataFileWriter::flush, DataFileWriter::close));
    }

    @SuppressFBWarnings(value={"RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"}, justification="mkdirs() returns false if the directory already existed, which is good. We don't care even if it didn't exist and we failed to create it, because we'll fail later when trying to create the file.")
    private static <D> FunctionEx<Processor.Context, DataFileWriter<D>> dataFileWriterFn(final String directoryName, final String jsonSchema, final SupplierEx<DatumWriter<D>> datumWriterSupplier) {
        return new FunctionEx<Processor.Context, DataFileWriter<D>>(){
            private static final long serialVersionUID = 1L;

            public DataFileWriter<D> applyEx(Processor.Context context) throws Exception {
                Schema.Parser parser = new Schema.Parser();
                Schema schema = parser.parse(jsonSchema);
                Path directory = Paths.get(directoryName, new String[0]);
                directory.toFile().mkdirs();
                Path file = directory.resolve(String.valueOf(context.globalProcessorIndex()));
                DataFileWriter writer = new DataFileWriter((DatumWriter)datumWriterSupplier.get());
                writer.create(schema, file.toFile());
                return writer;
            }

            public List<Permission> permissions() {
                return Collections.singletonList(ConnectorPermission.file((String)directoryName, (String)"write"));
            }
        };
    }

    private static <D, T> FunctionEx<? super Path, ? extends Stream<T>> dataFileReadFn(final String directoryName, final SupplierEx<? extends DatumReader<D>> datumReaderSupplier, final BiFunctionEx<String, ? super D, T> mapOutputFn) {
        return new FunctionEx<Path, Stream<T>>(){
            private static final long serialVersionUID = 1L;

            public Stream<T> applyEx(Path path) throws Exception {
                DataFileReader reader = new DataFileReader(path.toFile(), (DatumReader)datumReaderSupplier.get());
                String fileName = path.getFileName().toString();
                return (Stream)StreamSupport.stream(reader.spliterator(), false).map(item -> mapOutputFn.apply((Object)fileName, item)).onClose(() -> Util.uncheckRun(() -> ((DataFileReader)reader).close()));
            }

            public List<Permission> permissions() {
                return Collections.singletonList(ConnectorPermission.file((String)directoryName, (String)"read"));
            }
        };
    }
}

