/*
 * Decompiled with CFR 0.152.
 */
package org.modeshape.graph.connector.federation;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.transaction.xa.XAResource;
import org.modeshape.common.statistic.Stopwatch;
import org.modeshape.common.util.Logger;
import org.modeshape.graph.ExecutionContext;
import org.modeshape.graph.cache.CachePolicy;
import org.modeshape.graph.connector.RepositoryConnection;
import org.modeshape.graph.connector.RepositorySourceException;
import org.modeshape.graph.connector.federation.FederatedRepository;
import org.modeshape.graph.connector.federation.FederatedRequest;
import org.modeshape.graph.connector.federation.ForkRequestProcessor;
import org.modeshape.graph.connector.federation.JoinRequestProcessor;
import org.modeshape.graph.connector.federation.NoMoreFederatedRequests;
import org.modeshape.graph.observe.Observer;
import org.modeshape.graph.property.DateTime;
import org.modeshape.graph.request.CompositeRequest;
import org.modeshape.graph.request.Request;
import org.modeshape.graph.request.processor.RequestProcessor;

class FederatedRepositoryConnection
implements RepositoryConnection {
    private final FederatedRepository repository;
    private final Stopwatch stopwatch;
    private final Observer observer;
    private static final Logger LOGGER = Logger.getLogger(FederatedRepositoryConnection.class);

    FederatedRepositoryConnection(FederatedRepository repository, Observer observer) {
        this.repository = repository;
        this.stopwatch = LOGGER.isTraceEnabled() ? new Stopwatch() : null;
        this.observer = observer;
    }

    FederatedRepository getRepository() {
        return this.repository;
    }

    @Override
    public CachePolicy getDefaultCachePolicy() {
        return this.repository.getDefaultCachePolicy();
    }

    @Override
    public String getSourceName() {
        return this.repository.getSourceName();
    }

    @Override
    public XAResource getXAResource() {
        return null;
    }

    @Override
    public boolean ping(long time, TimeUnit unit) {
        return true;
    }

    protected boolean shouldProcessSynchronously(Request request) {
        if (request instanceof CompositeRequest) {
            CompositeRequest composite = (CompositeRequest)request;
            return composite.size() == 1;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void execute(ExecutionContext context, final Request request) throws RepositorySourceException {
        DateTime nowInUtc = context.getValueFactories().getDateFactory().createUtc();
        boolean synchronousStep1 = this.shouldProcessSynchronously(request);
        boolean awaitAllSubtasks = false;
        if (this.stopwatch != null) {
            this.stopwatch.start();
        }
        boolean abort = false;
        RequestProcessor processorWithEvents = null;
        try {
            final LinkedBlockingQueue<FederatedRequest> requests = new LinkedBlockingQueue<FederatedRequest>();
            final ForkRequestProcessor fork = new ForkRequestProcessor(this.repository, context, nowInUtc, requests);
            if (synchronousStep1) {
                try {
                    fork.process(request);
                }
                finally {
                    fork.close();
                }
                requests.add(new NoMoreFederatedRequests());
            } else {
                this.repository.getExecutor().submit(new Runnable(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        try {
                            fork.process(request);
                        }
                        finally {
                            fork.close();
                        }
                        requests.add(new NoMoreFederatedRequests());
                    }
                });
            }
            JoinRequestProcessor join = new JoinRequestProcessor(this.repository, context, this.observer, nowInUtc);
            try {
                join.process((BlockingQueue<FederatedRequest>)requests);
            }
            catch (RuntimeException e) {
                abort = true;
                throw e;
            }
            finally {
                fork.await();
                join.close();
                processorWithEvents = join;
            }
            if (request instanceof CompositeRequest) {
                ((CompositeRequest)request).checkForErrors();
            }
            if (request.hasError() && !request.isReadOnly()) {
                abort = true;
            }
        }
        catch (InterruptedException e) {
            abort = true;
            request.setError(e);
        }
        catch (ExecutionException e) {
            abort = true;
            request.setError(e);
        }
        catch (CancellationException e) {
            abort = true;
            request.cancel();
        }
        catch (RuntimeException e) {
            abort = true;
            throw e;
        }
        finally {
            if (this.stopwatch != null) {
                this.stopwatch.stop();
            }
            if (!abort) {
                assert (processorWithEvents != null);
                processorWithEvents.notifyObserverOfChanges();
            }
        }
    }

    @Override
    public void close() {
        if (this.stopwatch != null) {
            LOGGER.trace("Processing federated requests:\n" + this.stopwatch.getDetailedStatistics(), new Object[0]);
        }
    }
}

