/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer;

import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.HashSet;
import java.util.Map;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SslConsumerFactoryFn
implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> {
    private final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> sslConfig;
    private static final @UnknownKeyFor @NonNull @Initialized String TRUSTSTORE_LOCAL_PATH = "/tmp/kafka.truststore.jks";
    private static final @UnknownKeyFor @NonNull @Initialized String KEYSTORE_LOCAL_PATH = "/tmp/kafka.keystore.jks";
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(SslConsumerFactoryFn.class);

    public SslConsumerFactoryFn(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> sslConfig) {
        this.sslConfig = sslConfig;
    }

    public @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> apply(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> config) {
        String truststoreLocation = this.sslConfig.get("ssl.truststore.location");
        String keystoreLocation = this.sslConfig.get("ssl.keystore.location");
        if (truststoreLocation == null || keystoreLocation == null) {
            LOG.warn("Not enough information to configure SSL");
            return new KafkaConsumer(config);
        }
        try {
            if (truststoreLocation.startsWith("gs://")) {
                SslConsumerFactoryFn.getGcsFileAsLocal(truststoreLocation, TRUSTSTORE_LOCAL_PATH);
                this.sslConfig.put("ssl.truststore.location", TRUSTSTORE_LOCAL_PATH);
            } else {
                this.checkFileExists(truststoreLocation);
            }
            if (keystoreLocation.startsWith("gs://")) {
                SslConsumerFactoryFn.getGcsFileAsLocal(keystoreLocation, KEYSTORE_LOCAL_PATH);
                this.sslConfig.put("ssl.keystore.location", KEYSTORE_LOCAL_PATH);
            } else {
                this.checkFileExists(keystoreLocation);
            }
        }
        catch (IOException e) {
            LOG.error("Failed to retrieve data for SSL", (Throwable)e);
            return new KafkaConsumer(config);
        }
        config.put("security.protocol", SecurityProtocol.SASL_SSL.name());
        config.put("ssl.truststore.location", this.sslConfig.get("ssl.truststore.location"));
        config.put("ssl.keystore.location", this.sslConfig.get("ssl.keystore.location"));
        config.put("ssl.truststore.password", this.sslConfig.get("ssl.truststore.password"));
        config.put("ssl.keystore.password", this.sslConfig.get("ssl.keystore.password"));
        config.put("ssl.key.password", this.sslConfig.get("ssl.key.password"));
        return new KafkaConsumer(config);
    }

    private void checkFileExists(@UnknownKeyFor @NonNull @Initialized String filePath) throws @UnknownKeyFor @NonNull @Initialized IOException {
        LOG.info("Trying to get file: {} locally. Local files don't support when in using distribute runner", (Object)filePath);
        File f = new File(filePath);
        if (!f.exists()) {
            LOG.error("{} does not exist", (Object)f.getAbsolutePath());
            throw new IOException();
        }
        LOG.debug("{} exists", (Object)f.getAbsolutePath());
    }

    public static void getGcsFileAsLocal(@UnknownKeyFor @NonNull @Initialized String gcsFilePath, @UnknownKeyFor @NonNull @Initialized String outputFilePath) throws @UnknownKeyFor @NonNull @Initialized IOException {
        LOG.info("Reading contents from GCS file: {}", (Object)gcsFilePath);
        HashSet<StandardOpenOption> options = new HashSet<StandardOpenOption>(2);
        options.add(StandardOpenOption.CREATE);
        options.add(StandardOpenOption.APPEND);
        try (ReadableByteChannel readerChannel = FileSystems.open((ResourceId)FileSystems.matchSingleFileSpec((String)gcsFilePath).resourceId());
             FileChannel writeChannel = FileChannel.open(Paths.get(outputFilePath, new String[0]), options, new FileAttribute[0]);){
            writeChannel.transferFrom(readerChannel, 0L, Long.MAX_VALUE);
        }
    }
}

