/*
 * Decompiled with CFR 0.152.
 */
package it.unimi.di.law.bubing.frontier;

import it.unimi.di.law.bubing.Agent;
import it.unimi.di.law.bubing.RuntimeConfiguration;
import it.unimi.di.law.bubing.frontier.DNSThread;
import it.unimi.di.law.bubing.frontier.Distributor;
import it.unimi.di.law.bubing.frontier.DoneThread;
import it.unimi.di.law.bubing.frontier.FetchingThread;
import it.unimi.di.law.bubing.frontier.ParsingThread;
import it.unimi.di.law.bubing.frontier.StatsThread;
import it.unimi.di.law.bubing.frontier.TodoThread;
import it.unimi.di.law.bubing.frontier.VisitState;
import it.unimi.di.law.bubing.frontier.Workbench;
import it.unimi.di.law.bubing.frontier.WorkbenchVirtualizer;
import it.unimi.di.law.bubing.sieve.AbstractSieve;
import it.unimi.di.law.bubing.sieve.ByteArrayListByteSerializerDeserializer;
import it.unimi.di.law.bubing.sieve.ByteSerializerDeserializer;
import it.unimi.di.law.bubing.sieve.IdentitySieve;
import it.unimi.di.law.bubing.sieve.MercatorSieve;
import it.unimi.di.law.bubing.store.Store;
import it.unimi.di.law.bubing.util.BURL;
import it.unimi.di.law.bubing.util.BubingJob;
import it.unimi.di.law.bubing.util.ByteArrayDiskQueue;
import it.unimi.di.law.bubing.util.ConcurrentCountingMap;
import it.unimi.di.law.bubing.util.FastApproximateByteArrayCache;
import it.unimi.di.law.bubing.util.FetchData;
import it.unimi.di.law.bubing.util.LockFreeQueue;
import it.unimi.di.law.bubing.util.MurmurHash3;
import it.unimi.di.law.bubing.util.Util;
import it.unimi.di.law.warc.io.ParallelBufferedWarcWriter;
import it.unimi.dsi.fastutil.bytes.ByteArrayList;
import it.unimi.dsi.fastutil.io.BinIO;
import it.unimi.dsi.fastutil.io.FastBufferedInputStream;
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import it.unimi.dsi.jai4j.JobListener;
import it.unimi.dsi.jai4j.NoSuchJobManagerException;
import it.unimi.dsi.stat.SummaryStats;
import it.unimi.dsi.sux4j.mph.AbstractHashFunction;
import it.unimi.dsi.util.BloomFilter;
import it.unimi.dsi.util.Properties;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import net.htmlparser.jericho.Config;
import net.htmlparser.jericho.LoggerProvider;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xbill.DNS.Lookup;

public class Frontier
implements JobListener<BubingJob>,
AbstractSieve.NewFlowReceiver<ByteArrayList> {
    private static final Logger LOGGER = LoggerFactory.getLogger(Frontier.class);
    public static final int READY_URLS_BUFFER_SIZE = 0x4000000;
    public static final InetAddress[] LOOPBACK;
    private static final String ROBOTS_STORE = "robots.warc.gz";
    public static final long MIN_FLUSH_INTERVAL = 10000L;
    public static final long FRONT_INCREASE = 1000L;
    public static final AbstractHashFunction<byte[]> BYTE_ARRAY_HASHING_STRATEGY;
    public static final AbstractHashFunction<ByteArrayList> BYTE_ARRAY_LIST_HASHING_STRATEGY;
    protected final Store store;
    protected final Agent agent;
    public final RuntimeConfiguration rc;
    public AbstractSieve<ByteArrayList, Void> sieve;
    public ByteArrayDiskQueue readyURLs;
    public ArrayBlockingQueue<ByteArrayList> quickReceivedURLs;
    public ByteArrayDiskQueue receivedURLs;
    public final ObjectArrayList<ParsingThread> parsingThreads;
    public final ParallelBufferedWarcWriter robotsWarcParallelOutputStream;
    public final AtomicLong pathQueriesInQueues;
    public final AtomicLong weightOfpathQueriesInQueues;
    public final AtomicLong brokenVisitStates;
    public final AtomicLong numberOfReceivedURLs;
    protected volatile long nextFlush;
    public final Workbench workbench;
    public final DelayQueue<VisitState> unknownHosts;
    public final LinkedBlockingQueue<VisitState> newVisitStates;
    public BloomFilter<Void> digests;
    protected final ObjectArrayList<DNSThread> dnsThreads;
    private final ObjectArrayList<FetchingThread> fetchingThreads;
    protected final Distributor distributor;
    public final FastApproximateByteArrayCache urlCache;
    protected final WorkbenchVirtualizer virtualizer;
    public final LockFreeQueue<VisitState> todo;
    public final LockFreeQueue<VisitState> done;
    protected final LockFreeQueue<VisitState> refill;
    public final AtomicLong requiredFrontSize;
    public final AtomicLong fetchingThreadWaits;
    public final AtomicLong fetchingThreadWaitingTimeSum;
    public final LockFreeQueue<FetchData> results;
    private TodoThread todoThread;
    private DoneThread doneThread;
    public final AtomicLong[] archetypesStatus;
    public final SummaryStats outdegree;
    public final SummaryStats externalOutdegree;
    public final SummaryStats contentLength;
    public final AtomicLong contentTypeText;
    public final AtomicLong contentTypeImage;
    public final AtomicLong contentTypeApplication;
    public final AtomicLong contentTypeOthers;
    public final AtomicLong duplicates;
    public final AtomicLong fetchedResources;
    public final AtomicLong fetchedRobots;
    public final AtomicLong transferredBytes;
    public ConcurrentCountingMap schemeAuthority2Count;
    public final AtomicLongArray speedDist;
    protected double averageSpeed;
    public volatile long workbenchSizeInPathQueries;
    public final RequestConfig defaultRequestConfig;
    public final RequestConfig robotsRequestConfig;
    private ConcurrentCountingMap.LockedMap lockedMap;

    public Frontier(RuntimeConfiguration rc, Store store, Agent agent) throws IOException, IllegalArgumentException, ConfigurationException, ClassNotFoundException, InterruptedException {
        this.rc = rc;
        this.schemeAuthority2Count = new ConcurrentCountingMap();
        this.workbenchSizeInPathQueries = rc.workbenchMaxByteSize / 100L;
        this.averageSpeed = 1.0 / (double)rc.schemeAuthorityDelay;
        File robotsFile = new File(rc.storeDir, ROBOTS_STORE);
        LOGGER.info("Opening file " + robotsFile + " to write robots.txt");
        this.robotsWarcParallelOutputStream = new ParallelBufferedWarcWriter((OutputStream)new FastBufferedOutputStream((OutputStream)new FileOutputStream(robotsFile, !rc.crawlIsNew)), true);
        this.urlCache = new FastApproximateByteArrayCache(rc.urlCacheMaxByteSize);
        this.store = store;
        this.sieve = rc.sieveSize == 0 ? new IdentitySieve<ByteArrayList, Void>(this, new ByteArrayListByteSerializerDeserializer(), ByteSerializerDeserializer.VOID, BYTE_ARRAY_LIST_HASHING_STRATEGY, null) : new MercatorSieve<ByteArrayList, Void>(rc.crawlIsNew, rc.sieveDir, rc.sieveSize, rc.sieveStoreIOBufferByteSize, rc.sieveAuxFileIOBufferByteSize, this, new ByteArrayListByteSerializerDeserializer(), ByteSerializerDeserializer.VOID, BYTE_ARRAY_LIST_HASHING_STRATEGY, null);
        this.agent = agent;
        this.workbench = new Workbench();
        this.unknownHosts = new DelayQueue();
        this.virtualizer = new WorkbenchVirtualizer(this);
        this.pathQueriesInQueues = new AtomicLong();
        this.weightOfpathQueriesInQueues = new AtomicLong();
        this.brokenVisitStates = new AtomicLong();
        this.fetchedResources = new AtomicLong();
        this.fetchedRobots = new AtomicLong();
        this.transferredBytes = new AtomicLong();
        this.speedDist = new AtomicLongArray(40);
        this.archetypesStatus = new AtomicLong[6];
        for (int i = 0; i < 6; ++i) {
            this.archetypesStatus[i] = new AtomicLong();
        }
        this.outdegree = new SummaryStats();
        this.externalOutdegree = new SummaryStats();
        this.contentLength = new SummaryStats();
        this.contentTypeText = new AtomicLong();
        this.contentTypeImage = new AtomicLong();
        this.contentTypeApplication = new AtomicLong();
        this.contentTypeOthers = new AtomicLong();
        this.duplicates = new AtomicLong();
        this.numberOfReceivedURLs = new AtomicLong();
        this.requiredFrontSize = new AtomicLong(1000L);
        this.fetchingThreadWaits = new AtomicLong();
        this.fetchingThreadWaitingTimeSum = new AtomicLong();
        this.defaultRequestConfig = RequestConfig.custom().setSocketTimeout(rc.socketTimeout).setConnectTimeout(rc.connectionTimeout).setConnectionRequestTimeout(rc.connectionTimeout).setCookieSpec(rc.cookiePolicy).setRedirectsEnabled(false).setProxy(rc.proxyHost.length() > 0 ? new HttpHost(rc.proxyHost, rc.proxyPort) : null).build();
        this.robotsRequestConfig = RequestConfig.custom().setSocketTimeout(rc.socketTimeout).setConnectTimeout(rc.connectionTimeout).setConnectionRequestTimeout(rc.connectionTimeout).setCookieSpec(rc.cookiePolicy).setRedirectsEnabled(true).setMaxRedirects(5).setProxy(rc.proxyHost.length() > 0 ? new HttpHost(rc.proxyHost, rc.proxyPort) : null).build();
        this.dnsThreads = new ObjectArrayList();
        this.fetchingThreads = new ObjectArrayList();
        this.parsingThreads = new ObjectArrayList();
        this.newVisitStates = new LinkedBlockingQueue();
        this.todo = new LockFreeQueue();
        this.done = new LockFreeQueue();
        this.refill = new LockFreeQueue();
        this.results = new LockFreeQueue();
        this.distributor = new Distributor(this);
        Config.LoggerProvider = LoggerProvider.SLF4J;
        this.quickReceivedURLs = new ArrayBlockingQueue(1024);
        if (rc.crawlIsNew) {
            this.digests = BloomFilter.create((long)Math.max(1L, rc.maxUrls), (double)rc.bloomFilterPrecision);
            this.readyURLs = ByteArrayDiskQueue.createNew(new File(rc.frontierDir, "ready"), 0x4000000, true);
            this.receivedURLs = ByteArrayDiskQueue.createNew(new File(rc.frontierDir, "received"), 16384, true);
            this.distributor.statsThread.start(0L);
        } else {
            this.restore();
        }
        this.distributor.start();
        this.todoThread = new TodoThread(this);
        this.todoThread.start();
        this.doneThread = new DoneThread(this);
        this.doneThread.start();
        Lookup.getDefaultCache((int)1).setMaxEntries(rc.dnsCacheMaxSize);
        Lookup.getDefaultCache((int)1).setMaxCache((int)Math.min(rc.dnsPositiveTtl, Integer.MAX_VALUE));
        Lookup.getDefaultCache((int)1).setMaxNCache((int)Math.min(rc.dnsNegativeTtl, Integer.MAX_VALUE));
        Lookup.getDefaultResolver().setTimeout(60);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dnsThreads(int newDnsThreads) throws IllegalArgumentException {
        if (newDnsThreads <= 0) {
            throw new IllegalArgumentException();
        }
        ObjectArrayList<DNSThread> objectArrayList = this.dnsThreads;
        synchronized (objectArrayList) {
            if (newDnsThreads < this.dnsThreads.size()) {
                for (int i = newDnsThreads; i < this.dnsThreads.size(); ++i) {
                    ((DNSThread)this.dnsThreads.get((int)i)).stop = true;
                }
                this.dnsThreads.size(newDnsThreads);
                return;
            }
            int i = newDnsThreads - this.dnsThreads.size();
            while (i-- != 0) {
                DNSThread thread = new DNSThread(this, this.dnsThreads.size());
                thread.start();
                this.dnsThreads.add((Object)thread);
            }
        }
        LOGGER.info("Number of DNS Threads set to " + newDnsThreads);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void fetchingThreads(int numFetchingThreads) throws IllegalArgumentException, NoSuchAlgorithmException, IOException {
        if (numFetchingThreads <= 0) {
            throw new IllegalArgumentException();
        }
        ObjectArrayList<FetchingThread> objectArrayList = this.fetchingThreads;
        synchronized (objectArrayList) {
            if (numFetchingThreads < this.fetchingThreads.size()) {
                for (int i = numFetchingThreads; i < this.fetchingThreads.size(); ++i) {
                    ((FetchingThread)this.fetchingThreads.get((int)i)).stop = true;
                }
                this.fetchingThreads.size(numFetchingThreads);
                return;
            }
            int i = numFetchingThreads - this.fetchingThreads.size();
            while (i-- != 0) {
                FetchingThread thread = new FetchingThread(this, this.fetchingThreads.size());
                thread.start();
                this.fetchingThreads.add((Object)thread);
            }
        }
        LOGGER.info("Number of Fetching Threads set to " + numFetchingThreads);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void parsingThreads(int newParsingThreads) throws IllegalArgumentException {
        if (newParsingThreads <= 0) {
            throw new IllegalArgumentException();
        }
        ObjectArrayList<ParsingThread> objectArrayList = this.parsingThreads;
        synchronized (objectArrayList) {
            if (newParsingThreads < this.parsingThreads.size()) {
                for (int i = newParsingThreads; i < this.parsingThreads.size(); ++i) {
                    ((ParsingThread)this.parsingThreads.get((int)i)).stop = true;
                }
                this.parsingThreads.size(newParsingThreads);
                return;
            }
            int i = newParsingThreads - this.parsingThreads.size();
            while (i-- != 0) {
                ParsingThread thread = new ParsingThread(this, this.store, this.parsingThreads.size());
                thread.start();
                this.parsingThreads.add((Object)thread);
            }
        }
        LOGGER.info("Number of Parsing Threads set to " + newParsingThreads);
    }

    public void enqueue(ByteArrayList url) throws IOException, InterruptedException {
        byte[] urlBuffer = url.elements();
        int inStore = this.schemeAuthority2Count.get(urlBuffer, 0, BURL.startOfpathAndQuery(urlBuffer));
        if (inStore >= this.rc.maxUrlsPerSchemeAuthority) {
            return;
        }
        if (!this.urlCache.add(url)) {
            return;
        }
        BubingJob job = new BubingJob(url);
        if (this.agent.local(job)) {
            if (this.sieve.enqueue(url, null)) {
                this.nextFlush = System.currentTimeMillis() + 10000L;
            }
        } else {
            try {
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("Sending out scheme+authority {} with path+query {}", (Object)Util.toString(BURL.schemeAndAuthorityAsByteArray(urlBuffer)), (Object)Util.toString(BURL.pathAndQueryAsByteArray(url)));
                }
                this.agent.submit(job);
            }
            catch (IllegalStateException e) {
                LOGGER.warn("Impossible to submit URL " + BURL.fromNormalizedByteArray(url.toByteArray()), (Throwable)e);
            }
            catch (NoSuchJobManagerException e) {
                LOGGER.warn("Impossible to submit URL " + BURL.fromNormalizedByteArray(url.toByteArray()), (Throwable)e);
            }
        }
    }

    public boolean workbenchIsFull() {
        return this.weightOfpathQueriesInQueues.get() >= this.rc.workbenchMaxByteSize;
    }

    public void enqueueLocal(ByteArrayList url) throws IOException, InterruptedException {
        byte[] urlBuffer = url.elements();
        int inStore = this.schemeAuthority2Count.get(urlBuffer, 0, BURL.startOfpathAndQuery(urlBuffer));
        if (inStore >= this.rc.maxUrlsPerSchemeAuthority) {
            return;
        }
        if (!this.urlCache.add(url)) {
            return;
        }
        if (this.sieve.enqueue(url, null)) {
            this.nextFlush = System.currentTimeMillis() + 10000L;
        }
    }

    public void receive(BubingJob job) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Receiving job {}", (Object)job.url);
        }
        try {
            this.quickReceivedURLs.put(job.url);
        }
        catch (Exception e) {
            LOGGER.error("Error while enqueueing " + job.url, (Throwable)e);
        }
    }

    public void close() throws IOException, InterruptedException {
        VisitState visitState;
        boolean someAlive;
        this.todoThread.interrupt();
        this.distributor.join();
        LOGGER.info("Joined distributor");
        this.todoThread.join();
        LOGGER.info("Joined todo thread");
        for (Thread t : this.dnsThreads) {
            t.stop = true;
        }
        for (Thread t : this.dnsThreads) {
            t.join();
        }
        LOGGER.info("Joined DNS threads");
        for (Thread t : this.fetchingThreads) {
            ((FetchingThread)t).stop = true;
        }
        long time = System.currentTimeMillis();
        do {
            Thread.sleep(1000L);
            someAlive = false;
            for (Thread t : this.fetchingThreads) {
                someAlive |= t.isAlive();
            }
        } while (someAlive && System.currentTimeMillis() - time < (long)(this.rc.socketTimeout * 2));
        if (someAlive) {
            for (Thread t : this.fetchingThreads) {
                ((FetchingThread)t).abort();
            }
        }
        time = System.currentTimeMillis();
        do {
            Thread.sleep(1000L);
            someAlive = false;
            for (Thread t : this.fetchingThreads) {
                someAlive |= t.isAlive();
            }
        } while (someAlive && System.currentTimeMillis() - time < (long)(this.rc.socketTimeout * 2));
        if (someAlive) {
            LOGGER.error("Some fetching threads are still alive");
            for (Thread t : this.fetchingThreads) {
                t.interrupt();
            }
        }
        for (Thread t : this.fetchingThreads) {
            t.join();
        }
        LOGGER.info("Joined fetching threads");
        Thread.sleep(2000L);
        this.doneThread.stop = true;
        this.doneThread.join();
        LOGGER.info("Joined done thread");
        while (this.results.size() != 0L) {
            someAlive = false;
            for (Thread t : this.parsingThreads) {
                someAlive |= t.isAlive();
            }
            if (!someAlive) {
                LOGGER.error("No parsing thread alive: some results might not have been parsed");
                break;
            }
            Thread.sleep(1000L);
        }
        if (this.results.size() == 0L) {
            LOGGER.info("All results have been parsed");
        }
        for (Thread t : this.parsingThreads) {
            ((ParsingThread)t).stop = true;
        }
        for (Thread t : this.parsingThreads) {
            t.join();
        }
        this.robotsWarcParallelOutputStream.close();
        this.store.close();
        LOGGER.info("Joined parsing threads and closed stores");
        for (Thread t : this.fetchingThreads) {
            ((FetchingThread)t).close();
        }
        LOGGER.info("Closed fetching threads");
        while ((visitState = this.todo.poll()) != null) {
            this.workbench.release(visitState);
        }
        while ((visitState = this.done.poll()) != null) {
            if (visitState.nextFetch != Long.MAX_VALUE && this.virtualizer.count(visitState) > 0L && visitState.isEmpty()) {
                this.refill.add(visitState);
            }
            this.workbench.release(visitState);
        }
        while ((visitState = this.refill.poll()) != null) {
            int dequeuedURLs = this.virtualizer.dequeuePathQueries(visitState, visitState.pathQueryLimit());
            if (dequeuedURLs == 0) {
                LOGGER.info("No URLs on disk during last refill: " + visitState);
            }
            if (!visitState.acquired) continue;
            LOGGER.warn("Visit state in the poll queue is acquired: " + visitState);
        }
        this.sieve.close();
        this.distributor.statsThread.done();
    }

    @Override
    public void prepareToAppend() throws IOException {
        this.lockedMap = this.schemeAuthority2Count.lock();
    }

    @Override
    public void append(long hash, ByteArrayList list) throws IOException {
        byte[] urlBuffer = list.elements();
        int length = list.size();
        if (this.lockedMap.get(urlBuffer, 0, BURL.startOfpathAndQuery(urlBuffer)) < this.rc.maxUrlsPerSchemeAuthority) {
            this.readyURLs.enqueue(urlBuffer, 0, length);
        }
    }

    @Override
    public synchronized void finishedAppending() throws IOException {
        this.lockedMap.unlock();
    }

    @Override
    public void noMoreAppend() throws IOException {
    }

    public void updateRequestedFrontSize() {
        long currentRequiredFrontSize = this.requiredFrontSize.get();
        if (this.workbench.approximatedSize() + this.todo.size() - this.workbench.broken.get() >= currentRequiredFrontSize && this.requiredFrontSize.compareAndSet(currentRequiredFrontSize, Math.min(currentRequiredFrontSize + 1000L, this.workbenchSizeInPathQueries / 2L))) {
            LOGGER.info("Required front size: " + this.requiredFrontSize.get());
        }
    }

    public void updateFetchingThreadsWaitingStats(long waitTime) {
        this.fetchingThreadWaits.incrementAndGet();
        this.fetchingThreadWaitingTimeSum.addAndGet(waitTime);
    }

    public void resetFetchingThreadsWaitingStats() {
        this.fetchingThreadWaits.set(0L);
        this.fetchingThreadWaitingTimeSum.set(0L);
    }

    public void snap() throws ConfigurationException, IllegalArgumentException, IOException {
        for (VisitState visitState : this.distributor.schemeAuthority2VisitState.visitStates()) {
            if (visitState == null) continue;
            visitState.removeRobots();
        }
        LOGGER.info("Final statistics");
        this.distributor.statsThread.emit();
        this.distributor.statsThread.run();
        File snapDir = new File(this.rc.frontierDir, "snap");
        LOGGER.info("Started snapping to " + snapDir);
        if (snapDir.exists()) {
            LOGGER.warn("Already existing snap directory " + snapDir + ": data will be overwritten (this shouldn't happen)");
        } else if (!snapDir.mkdir()) {
            LOGGER.error("Could not create snap directory " + snapDir + ": will not produce snap");
            return;
        }
        LOGGER.info("Snapping scalar data");
        Properties scalarData = new Properties();
        long epoch = System.currentTimeMillis();
        scalarData.addProperty((Enum)PropertyKeys.EPOCH, epoch);
        scalarData.setHeader("Snap started at " + new Date());
        scalarData.addProperty((Enum)PropertyKeys.PATHQUERIESINQUEUES, this.pathQueriesInQueues.get());
        scalarData.addProperty((Enum)PropertyKeys.WEIGHTOFPATHQUERIESINQUEUES, this.weightOfpathQueriesInQueues.get());
        scalarData.addProperty((Enum)PropertyKeys.BROKENVISITSTATES, this.brokenVisitStates.get());
        scalarData.addProperty((Enum)PropertyKeys.NUMBEROFRECEIVEDURLS, this.numberOfReceivedURLs.get());
        scalarData.addProperty((Enum)PropertyKeys.REQUIREDFRONTSIZE, this.requiredFrontSize.get());
        scalarData.addProperty((Enum)PropertyKeys.FETCHINGTHREADWAITS, this.fetchingThreadWaits.get());
        scalarData.addProperty((Enum)PropertyKeys.FETCHINGTHREADWAITINGTIMESUM, this.fetchingThreadWaitingTimeSum.get());
        scalarData.addProperty((Enum)PropertyKeys.ARCHETYPESOTHERS, this.archetypesStatus[0].get());
        scalarData.addProperty((Enum)PropertyKeys.ARCHETYPES1XX, this.archetypesStatus[1].get());
        scalarData.addProperty((Enum)PropertyKeys.ARCHETYPES2XX, this.archetypesStatus[2].get());
        scalarData.addProperty((Enum)PropertyKeys.ARCHETYPES3XX, this.archetypesStatus[3].get());
        scalarData.addProperty((Enum)PropertyKeys.ARCHETYPES4XX, this.archetypesStatus[4].get());
        scalarData.addProperty((Enum)PropertyKeys.ARCHETYPES5XX, this.archetypesStatus[5].get());
        scalarData.addProperty((Enum)PropertyKeys.DUPLICATES, this.duplicates.get());
        scalarData.addProperty((Enum)PropertyKeys.FETCHEDRESOURCES, this.fetchedResources.get());
        scalarData.addProperty((Enum)PropertyKeys.FETCHEDROBOTS, this.fetchedRobots.get());
        scalarData.addProperty((Enum)PropertyKeys.TRANSFERREDBYTES, this.transferredBytes.get());
        scalarData.addProperty((Enum)PropertyKeys.AVERAGESPEED, this.averageSpeed);
        scalarData.addProperty((Enum)PropertyKeys.CRAWLDURATION, this.distributor.statsThread.requestLogger.millis());
        scalarData.addProperty((Enum)PropertyKeys.VISITSTATESETSIZE, this.distributor.schemeAuthority2VisitState.size());
        scalarData.addProperty((Enum)PropertyKeys.WORKBENCHENTRYSETSIZE, this.workbench.numberOfWorkbenchEntries());
        LOGGER.info("Storing virtualizer states");
        this.virtualizer.close();
        LOGGER.info("Freezing byte disk queues");
        scalarData.addProperty((Enum)PropertyKeys.READYURLSSIZE, this.readyURLs.size64());
        this.readyURLs.freeze();
        scalarData.addProperty((Enum)PropertyKeys.RECEIVEDURLSSIZE, this.receivedURLs.size64());
        this.receivedURLs.freeze();
        scalarData.save(new File(snapDir, "frontier.data"));
        LOGGER.info("Storing digests");
        BinIO.storeObject(this.digests, (File)new File(snapDir, "digests"));
        LOGGER.info("Storing counts");
        BinIO.storeObject((Object)this.schemeAuthority2Count, (File)new File(snapDir, "schemeAuthority2Count"));
        LOGGER.info("Storing visit states");
        ObjectOutputStream workbenchStream = new ObjectOutputStream((OutputStream)new FastBufferedOutputStream((OutputStream)new FileOutputStream(new File(snapDir, "workbench"))));
        long c = 0L;
        for (VisitState visitState : this.distributor.schemeAuthority2VisitState.visitStates()) {
            if (visitState == null) continue;
            if (visitState.acquired) {
                LOGGER.error("Acquired visit state: " + visitState);
            }
            ++c;
        }
        workbenchStream.writeLong(c);
        for (VisitState visitState : this.distributor.schemeAuthority2VisitState.visitStates()) {
            if (visitState == null) continue;
            workbenchStream.writeObject(visitState);
            workbenchStream.writeBoolean(visitState.workbenchEntry != null);
            if (visitState.workbenchEntry == null) continue;
            Util.writeByteArray(visitState.workbenchEntry.ipAddress, workbenchStream);
        }
        workbenchStream.close();
    }

    public void restore() throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
        long workbenchSize;
        File snapDir = new File(this.rc.frontierDir, "snap");
        if (!snapDir.exists() || !snapDir.isDirectory()) {
            LOGGER.error("Trying to restore state from snap directory " + snapDir + ", but it does not exist or is not a directory");
            return;
        }
        LOGGER.info("Restoring data from " + snapDir);
        LOGGER.info("Restoring scalar data");
        Properties scalarData = new Properties(new File(snapDir, "frontier.data"));
        long epoch = scalarData.getLong((Enum)PropertyKeys.EPOCH);
        this.transferredBytes.set(scalarData.getLong((Enum)PropertyKeys.TRANSFERREDBYTES));
        this.pathQueriesInQueues.set(scalarData.getLong((Enum)PropertyKeys.PATHQUERIESINQUEUES));
        this.weightOfpathQueriesInQueues.set(scalarData.getLong((Enum)PropertyKeys.WEIGHTOFPATHQUERIESINQUEUES));
        this.brokenVisitStates.set(scalarData.getLong((Enum)PropertyKeys.BROKENVISITSTATES));
        this.numberOfReceivedURLs.set(scalarData.getLong((Enum)PropertyKeys.NUMBEROFRECEIVEDURLS));
        this.requiredFrontSize.set(scalarData.getLong((Enum)PropertyKeys.REQUIREDFRONTSIZE));
        this.fetchingThreadWaits.set(scalarData.getLong((Enum)PropertyKeys.FETCHINGTHREADWAITS));
        this.fetchingThreadWaitingTimeSum.set(scalarData.getLong((Enum)PropertyKeys.FETCHINGTHREADWAITINGTIMESUM));
        this.archetypesStatus[0].set(scalarData.getLong((Enum)PropertyKeys.ARCHETYPESOTHERS));
        this.archetypesStatus[1].set(scalarData.getLong((Enum)PropertyKeys.ARCHETYPES1XX));
        this.archetypesStatus[2].set(scalarData.getLong((Enum)PropertyKeys.ARCHETYPES2XX));
        this.archetypesStatus[3].set(scalarData.getLong((Enum)PropertyKeys.ARCHETYPES3XX));
        this.archetypesStatus[4].set(scalarData.getLong((Enum)PropertyKeys.ARCHETYPES4XX));
        this.archetypesStatus[5].set(scalarData.getLong((Enum)PropertyKeys.ARCHETYPES5XX));
        this.duplicates.set(scalarData.getLong((Enum)PropertyKeys.DUPLICATES));
        this.fetchedResources.set(scalarData.getLong((Enum)PropertyKeys.FETCHEDRESOURCES));
        this.fetchedRobots.set(scalarData.getLong((Enum)PropertyKeys.FETCHEDROBOTS));
        this.transferredBytes.set(scalarData.getLong((Enum)PropertyKeys.TRANSFERREDBYTES));
        this.averageSpeed = scalarData.getDouble((Enum)PropertyKeys.AVERAGESPEED);
        this.distributor.schemeAuthority2VisitState.ensureCapacity(scalarData.getInt((Enum)PropertyKeys.VISITSTATESETSIZE));
        this.workbench.address2WorkbenchEntry.ensureCapacity(scalarData.getInt((Enum)PropertyKeys.WORKBENCHENTRYSETSIZE));
        LOGGER.info("Restoring digests");
        this.digests = (BloomFilter)BinIO.loadObject((File)new File(snapDir, "digests"));
        LOGGER.info("Restoring counts");
        this.schemeAuthority2Count = (ConcurrentCountingMap)BinIO.loadObject((File)new File(snapDir, "schemeAuthority2Count"));
        LOGGER.info("Restoring workbench");
        ObjectInputStream workbenchStream = new ObjectInputStream((InputStream)new FastBufferedInputStream((InputStream)new FileInputStream(new File(snapDir, "workbench"))));
        long w = workbenchSize = workbenchStream.readLong();
        try {
            while (w-- != 0L) {
                VisitState visitState = (VisitState)workbenchStream.readObject();
                visitState.frontier = this;
                this.distributor.schemeAuthority2VisitState.add(visitState);
                boolean nonNullWorkbenchEntry = workbenchStream.readBoolean();
                if (visitState.lastRobotsFetch == Long.MAX_VALUE) {
                    visitState.forciblyEnqueueRobotsFirst();
                }
                if (nonNullWorkbenchEntry) {
                    visitState.setWorkbenchEntry(this.workbench.getWorkbenchEntry(Util.readByteArray(workbenchStream)));
                } else {
                    this.newVisitStates.add(visitState);
                }
                if (!visitState.isEmpty() || this.virtualizer.count(visitState) <= 0L) continue;
                LOGGER.error("Empty visit state, URLs on disk: " + visitState);
                this.refill.add(visitState);
            }
        }
        catch (EOFException e) {
            LOGGER.error("Workbench stream too short: " + w + " visit states missing out of " + workbenchSize);
        }
        workbenchStream.close();
        this.virtualizer.readMetadata();
        LOGGER.info("Defreezing byte disk queues");
        long readyURLsSize = scalarData.getLong((Enum)PropertyKeys.READYURLSSIZE);
        this.readyURLs = ByteArrayDiskQueue.createFromFile(readyURLsSize, new File(this.rc.frontierDir, "ready"), 0x4000000, true);
        long receivedURLsSize = scalarData.getLong((Enum)PropertyKeys.RECEIVEDURLSSIZE);
        this.receivedURLs = ByteArrayDiskQueue.createFromFile(receivedURLsSize, new File(this.rc.frontierDir, "received"), 16384, true);
        File renameDir = new File(snapDir + "-" + epoch);
        LOGGER.info("Renaming snap directory " + snapDir + " to " + renameDir);
        if (!snapDir.renameTo(renameDir)) {
            LOGGER.error("Could not rename snap directory");
        }
        this.distributor.statsThread.start(scalarData.getLong((Enum)PropertyKeys.CRAWLDURATION));
        LOGGER.info("Starting statistics");
        this.distributor.statsThread.emit();
        this.distributor.statsThread.run();
    }

    public StatsThread getStatsThread() {
        return this.distributor.statsThread;
    }

    public long archetypes() {
        long rst = 0L;
        for (int i = 0; i < this.archetypesStatus.length; ++i) {
            rst += this.archetypesStatus[i].get();
        }
        return rst;
    }

    static {
        try {
            LOOPBACK = new InetAddress[]{InetAddress.getByAddress(new byte[]{127, 0, 0, 1})};
        }
        catch (UnknownHostException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
        BYTE_ARRAY_HASHING_STRATEGY = new AbstractHashFunction<byte[]>(){
            private static final long serialVersionUID = 1L;

            public long getLong(Object key) {
                return MurmurHash3.hash((byte[])key);
            }
        };
        BYTE_ARRAY_LIST_HASHING_STRATEGY = new AbstractHashFunction<ByteArrayList>(){
            private static final long serialVersionUID = 1L;

            public long getLong(Object key) {
                return MurmurHash3.hash((ByteArrayList)key);
            }
        };
    }

    public static enum PropertyKeys {
        PATHQUERIESINQUEUES,
        WEIGHTOFPATHQUERIESINQUEUES,
        BROKENVISITSTATES,
        NUMBEROFRECEIVEDURLS,
        REQUIREDFRONTSIZE,
        FETCHINGTHREADWAITS,
        FETCHINGTHREADWAITINGTIMESUM,
        ARCHETYPESOTHERS,
        ARCHETYPES1XX,
        ARCHETYPES2XX,
        ARCHETYPES3XX,
        ARCHETYPES4XX,
        ARCHETYPES5XX,
        DUPLICATES,
        FETCHEDRESOURCES,
        FETCHEDROBOTS,
        TRANSFERREDBYTES,
        AVERAGESPEED,
        CURRENTQUEUE,
        VIRTUALQUEUESIZES,
        VIRTUALQUEUESBIRTHTIME,
        READYURLSSIZE,
        RECEIVEDURLSSIZE,
        DISTRIBUTORWARMUP,
        DISTRIBUTORVISITSTATESONDISK,
        EPOCH,
        CRAWLDURATION,
        VISITSTATESETSIZE,
        WORKBENCHENTRYSETSIZE;

    }
}

