/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io;

import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Partitioner;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Map;
import javax.validation.constraints.NotNull;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.ftp.FTPFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public abstract class AbstractFTPInputOperator<T>
extends AbstractFileInputOperator<T> {
    @NotNull
    private String host;
    private int port = 21;
    private String userName = "anonymous";
    private String password = "guest";
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFTPInputOperator.class);

    @Override
    protected FileSystem getFSInstance() throws IOException {
        FTPFileSystem ftpFileSystem = new FTPFileSystem();
        String ftpUri = "ftp://" + this.userName + ":" + this.password + "@" + this.host + ":" + this.port;
        LOG.debug("ftp uri {}", (Object)ftpUri);
        ftpFileSystem.initialize(URI.create(ftpUri), this.configuration);
        return ftpFileSystem;
    }

    @Override
    public void partitioned(Map<Integer, Partitioner.Partition<AbstractFileInputOperator<T>>> partitions) {
        super.partitioned(partitions);
        for (Partitioner.Partition<AbstractFileInputOperator<T>> partition : partitions.values()) {
            ((AbstractFTPInputOperator)partition.getPartitionedInstance()).host = this.host;
            ((AbstractFTPInputOperator)partition.getPartitionedInstance()).port = this.port;
            ((AbstractFTPInputOperator)partition.getPartitionedInstance()).userName = this.userName;
            ((AbstractFTPInputOperator)partition.getPartitionedInstance()).password = this.password;
        }
    }

    public void setHost(String host) {
        this.host = host;
    }

    public String getHost() {
        return this.host;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public int getPort() {
        return this.port;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getUserName() {
        return this.userName;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getPassword() {
        return this.password;
    }

    public static class FTPStringInputOperator
    extends AbstractFTPInputOperator<String> {
        private transient BufferedReader br;
        public final transient DefaultOutputPort<String> output = new DefaultOutputPort();

        @Override
        protected InputStream openFile(Path path) throws IOException {
            InputStream is = super.openFile(path);
            this.br = new BufferedReader(new InputStreamReader(is));
            return is;
        }

        @Override
        protected void closeFile(InputStream is) throws IOException {
            super.closeFile(is);
            this.br = null;
        }

        @Override
        protected String readEntity() throws IOException {
            return this.br.readLine();
        }

        @Override
        protected void emit(String tuple) {
            this.output.emit((Object)tuple);
        }
    }
}

