/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.io.sstable;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.streaming.DefaultConnectionFactory;
import org.apache.cassandra.streaming.StreamConnectionFactory;
import org.apache.cassandra.streaming.StreamEvent;
import org.apache.cassandra.streaming.StreamEventHandler;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Ref;
import org.cassandraunit.shaded.com.google.common.collect.HashMultimap;
import org.cassandraunit.shaded.com.google.common.collect.Multimap;

public class SSTableLoader
implements StreamEventHandler {
    private final File directory;
    private final String keyspace;
    private final Client client;
    private final int connectionsPerHost;
    private final OutputHandler outputHandler;
    private final Set<InetAddress> failedHosts = new HashSet<InetAddress>();
    private final List<SSTableReader> sstables = new ArrayList<SSTableReader>();
    private final Multimap<InetAddress, StreamSession.SSTableStreamingSections> streamingDetails = HashMultimap.create();

    public SSTableLoader(File directory, Client client, OutputHandler outputHandler) {
        this(directory, client, outputHandler, 1);
    }

    public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost) {
        this.directory = directory;
        this.keyspace = directory.getParentFile().getName();
        this.client = client;
        this.outputHandler = outputHandler;
        this.connectionsPerHost = connectionsPerHost;
    }

    protected Collection<SSTableReader> openSSTables(Map<InetAddress, Collection<Range<Token>>> ranges) {
        this.outputHandler.output("Opening sstables and calculating sections to stream");
        LifecycleTransaction.getFiles(this.directory.toPath(), (file, type) -> {
            Descriptor desc;
            File dir = file.getParentFile();
            String name = file.getName();
            if (type != Directories.FileType.FINAL) {
                this.outputHandler.output(String.format("Skipping temporary file %s", name));
                return false;
            }
            Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name);
            Descriptor descriptor = desc = p == null ? null : (Descriptor)p.left;
            if (p == null || !((Component)p.right).equals(Component.DATA)) {
                return false;
            }
            if (!new File(desc.filenameFor(Component.PRIMARY_INDEX)).exists()) {
                this.outputHandler.output(String.format("Skipping file %s because index is missing", name));
                return false;
            }
            CFMetaData metadata = this.client.getTableMetadata(desc.cfname);
            if (metadata == null) {
                this.outputHandler.output(String.format("Skipping file %s: table %s.%s doesn't exist", name, this.keyspace, desc.cfname));
                return false;
            }
            HashSet<Component> components = new HashSet<Component>();
            components.add(Component.DATA);
            components.add(Component.PRIMARY_INDEX);
            if (new File(desc.filenameFor(Component.SUMMARY)).exists()) {
                components.add(Component.SUMMARY);
            }
            if (new File(desc.filenameFor(Component.COMPRESSION_INFO)).exists()) {
                components.add(Component.COMPRESSION_INFO);
            }
            if (new File(desc.filenameFor(Component.STATS)).exists()) {
                components.add(Component.STATS);
            }
            try {
                SSTableReader sstable = SSTableReader.openForBatch(desc, components, metadata);
                this.sstables.add(sstable);
                for (Map.Entry entry : ranges.entrySet()) {
                    InetAddress endpoint = (InetAddress)entry.getKey();
                    Collection tokenRanges = (Collection)entry.getValue();
                    List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges);
                    long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges);
                    Ref<SSTableReader> ref = sstable.ref();
                    StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(ref, sstableSections, estimatedKeys, 0L);
                    this.streamingDetails.put(endpoint, details);
                }
                sstable.releaseSummary();
            }
            catch (IOException e) {
                this.outputHandler.output(String.format("Skipping file %s, error opening it: %s", name, e.getMessage()));
            }
            return false;
        }, Directories.OnTxnErr.IGNORE);
        return this.sstables;
    }

    public StreamResultFuture stream() {
        return this.stream(Collections.emptySet(), new StreamEventHandler[0]);
    }

    public StreamResultFuture stream(Set<InetAddress> toIgnore, StreamEventHandler ... listeners) {
        this.client.init(this.keyspace);
        this.outputHandler.output("Established connection to initial hosts");
        StreamPlan plan = new StreamPlan("Bulk Load", 0L, this.connectionsPerHost, false, false, false).connectionFactory(this.client.getConnectionFactory());
        Map<InetAddress, Collection<Range<Token>>> endpointToRanges = this.client.getEndpointToRangesMap();
        this.openSSTables(endpointToRanges);
        if (this.sstables.isEmpty()) {
            return plan.execute();
        }
        this.outputHandler.output(String.format("Streaming relevant part of %sto %s", this.names(this.sstables), endpointToRanges.keySet()));
        for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : endpointToRanges.entrySet()) {
            InetAddress remote = entry.getKey();
            if (toIgnore.contains(remote)) continue;
            LinkedList<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<StreamSession.SSTableStreamingSections>();
            for (StreamSession.SSTableStreamingSections details : this.streamingDetails.get(remote)) {
                endpointDetails.add(details);
            }
            plan.transferFiles(remote, endpointDetails);
        }
        plan.listeners(this, listeners);
        return plan.execute();
    }

    @Override
    public void onSuccess(StreamState finalState) {
        this.releaseReferences();
    }

    @Override
    public void onFailure(Throwable t) {
        this.releaseReferences();
    }

    private void releaseReferences() {
        for (SSTableReader sstable : this.sstables) {
            sstable.selfRef().release();
            assert (sstable.selfRef().globalCount() == 0);
        }
    }

    @Override
    public void handleStreamEvent(StreamEvent event) {
        if (event.eventType == StreamEvent.Type.STREAM_COMPLETE) {
            StreamEvent.SessionCompleteEvent se = (StreamEvent.SessionCompleteEvent)event;
            if (!se.success) {
                this.failedHosts.add(se.peer);
            }
        }
    }

    private String names(Collection<SSTableReader> sstables) {
        StringBuilder builder = new StringBuilder();
        for (SSTableReader sstable : sstables) {
            builder.append(sstable.descriptor.filenameFor(Component.DATA)).append(" ");
        }
        return builder.toString();
    }

    public Set<InetAddress> getFailedHosts() {
        return this.failedHosts;
    }

    public static abstract class Client {
        private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<InetAddress, Collection<Range<Token>>>();

        public abstract void init(String var1);

        public void stop() {
        }

        public StreamConnectionFactory getConnectionFactory() {
            return new DefaultConnectionFactory();
        }

        public abstract CFMetaData getTableMetadata(String var1);

        public void setTableMetadata(CFMetaData cfm) {
            throw new RuntimeException();
        }

        public Map<InetAddress, Collection<Range<Token>>> getEndpointToRangesMap() {
            return this.endpointToRanges;
        }

        protected void addRangeForEndpoint(Range<Token> range, InetAddress endpoint) {
            Collection<Range<Token>> ranges = this.endpointToRanges.get(endpoint);
            if (ranges == null) {
                ranges = new HashSet<Range<Token>>();
                this.endpointToRanges.put(endpoint, ranges);
            }
            ranges.add(range);
        }
    }
}

