/*
 * Decompiled with CFR 0.152.
 */
package com.github.ltsopensource.jobclient;

import com.github.ltsopensource.core.AppContext;
import com.github.ltsopensource.core.cluster.AbstractClientNode;
import com.github.ltsopensource.core.commons.utils.Assert;
import com.github.ltsopensource.core.commons.utils.BatchUtils;
import com.github.ltsopensource.core.commons.utils.CollectionUtils;
import com.github.ltsopensource.core.commons.utils.StringUtils;
import com.github.ltsopensource.core.domain.Job;
import com.github.ltsopensource.core.exception.JobSubmitException;
import com.github.ltsopensource.core.exception.JobTrackerNotFoundException;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.core.monitor.MStatReporter;
import com.github.ltsopensource.core.protocol.JobProtos;
import com.github.ltsopensource.core.protocol.command.AbstractRemotingCommandBody;
import com.github.ltsopensource.core.protocol.command.CommandBodyWrapper;
import com.github.ltsopensource.core.protocol.command.JobCancelRequest;
import com.github.ltsopensource.core.protocol.command.JobSubmitRequest;
import com.github.ltsopensource.core.protocol.command.JobSubmitResponse;
import com.github.ltsopensource.jobclient.domain.JobClientAppContext;
import com.github.ltsopensource.jobclient.domain.JobClientNode;
import com.github.ltsopensource.jobclient.domain.Response;
import com.github.ltsopensource.jobclient.processor.RemotingDispatcher;
import com.github.ltsopensource.jobclient.support.JobClientMStatReporter;
import com.github.ltsopensource.jobclient.support.JobCompletedHandler;
import com.github.ltsopensource.jobclient.support.JobSubmitExecutor;
import com.github.ltsopensource.jobclient.support.JobSubmitProtector;
import com.github.ltsopensource.jobclient.support.SubmitCallback;
import com.github.ltsopensource.remoting.AsyncCallback;
import com.github.ltsopensource.remoting.RemotingCommandBody;
import com.github.ltsopensource.remoting.RemotingProcessor;
import com.github.ltsopensource.remoting.ResponseFuture;
import com.github.ltsopensource.remoting.protocol.RemotingCommand;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class JobClient<T extends JobClientNode, Context extends AppContext>
extends AbstractClientNode<JobClientNode, JobClientAppContext> {
    protected static final Logger LOGGER = LoggerFactory.getLogger(JobClient.class);
    private static final int BATCH_SIZE = 10;
    private JobSubmitProtector protector;
    protected JobClientMStatReporter stat;

    public JobClient() {
        this.stat = new JobClientMStatReporter(this.appContext);
        ((JobClientAppContext)this.appContext).setMStatReporter((MStatReporter)this.stat);
    }

    protected void beforeStart() {
        ((JobClientAppContext)this.appContext).setRemotingClient(this.remotingClient);
        this.protector = new JobSubmitProtector((JobClientAppContext)this.appContext);
    }

    protected void afterStart() {
        ((JobClientAppContext)this.appContext).getMStatReporter().start();
    }

    protected void afterStop() {
        ((JobClientAppContext)this.appContext).getMStatReporter().stop();
    }

    protected void beforeStop() {
    }

    public Response submitJob(Job job) throws JobSubmitException {
        this.checkStart();
        return this.protectSubmit(Collections.singletonList(job));
    }

    private Response protectSubmit(List<Job> jobs) throws JobSubmitException {
        return this.protector.execute(jobs, new JobSubmitExecutor<Response>(){

            @Override
            public Response execute(List<Job> jobs) throws JobSubmitException {
                return JobClient.this.submitJob(jobs, SubmitType.ASYNC);
            }
        });
    }

    public Response cancelJob(String taskId, String taskTrackerNodeGroup) {
        this.checkStart();
        Response response = new Response();
        Assert.hasText((String)taskId, (String)"taskId can not be empty");
        Assert.hasText((String)taskTrackerNodeGroup, (String)"taskTrackerNodeGroup can not be empty");
        JobCancelRequest request = (JobCancelRequest)CommandBodyWrapper.wrapper((AppContext)this.appContext, (AbstractRemotingCommandBody)new JobCancelRequest());
        request.setTaskId(taskId);
        request.setTaskTrackerNodeGroup(taskTrackerNodeGroup);
        RemotingCommand requestCommand = RemotingCommand.createRequestCommand((int)JobProtos.RequestCode.CANCEL_JOB.code(), (RemotingCommandBody)request);
        try {
            RemotingCommand remotingResponse = this.remotingClient.invokeSync(requestCommand);
            if (JobProtos.ResponseCode.JOB_CANCEL_SUCCESS.code() == remotingResponse.getCode()) {
                LOGGER.info("Cancel job success taskId={}, taskTrackerNodeGroup={} ", new Object[]{taskId, taskTrackerNodeGroup});
                response.setSuccess(true);
                return response;
            }
            response.setSuccess(false);
            response.setCode(JobProtos.ResponseCode.valueOf((int)remotingResponse.getCode()).name());
            response.setMsg(remotingResponse.getRemark());
            LOGGER.warn("Cancel job failed: taskId={}, taskTrackerNodeGroup={}, msg={}", new Object[]{taskId, taskTrackerNodeGroup, remotingResponse.getRemark()});
            return response;
        }
        catch (JobTrackerNotFoundException e) {
            response.setSuccess(false);
            response.setCode("11");
            response.setMsg("Can not found JobTracker node!");
            return response;
        }
    }

    private void checkFields(List<Job> jobs) {
        if (CollectionUtils.isEmpty(jobs)) {
            throw new JobSubmitException("Job can not be null!");
        }
        for (Job job : jobs) {
            if (job == null) {
                throw new JobSubmitException("Job can not be null!");
            }
            job.checkField();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Response submitJob(final List<Job> jobs, SubmitType type) throws JobSubmitException {
        this.checkFields(jobs);
        final Response response = new Response();
        try {
            JobSubmitRequest jobSubmitRequest = (JobSubmitRequest)CommandBodyWrapper.wrapper((AppContext)this.appContext, (AbstractRemotingCommandBody)new JobSubmitRequest());
            jobSubmitRequest.setJobs(jobs);
            RemotingCommand requestCommand = RemotingCommand.createRequestCommand((int)JobProtos.RequestCode.SUBMIT_JOB.code(), (RemotingCommandBody)jobSubmitRequest);
            SubmitCallback submitCallback = new SubmitCallback(){

                @Override
                public void call(RemotingCommand responseCommand) {
                    if (responseCommand == null) {
                        response.setFailedJobs(jobs);
                        response.setSuccess(false);
                        response.setMsg("Submit Job failed: JobTracker is broken");
                        LOGGER.warn("Submit Job failed: {}, {}", new Object[]{jobs, "JobTracker is broken"});
                        return;
                    }
                    if (JobProtos.ResponseCode.JOB_RECEIVE_SUCCESS.code() == responseCommand.getCode()) {
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("Submit Job success: {}", new Object[]{jobs});
                        }
                        response.setSuccess(true);
                        return;
                    }
                    JobSubmitResponse jobSubmitResponse = (JobSubmitResponse)responseCommand.getBody();
                    response.setFailedJobs(jobSubmitResponse.getFailedJobs());
                    response.setSuccess(false);
                    response.setCode(JobProtos.ResponseCode.valueOf((int)responseCommand.getCode()).name());
                    response.setMsg("Submit Job failed: " + responseCommand.getRemark() + " " + jobSubmitResponse.getMsg());
                    LOGGER.warn("Submit Job failed: {}, {}, {}", new Object[]{jobs, responseCommand.getRemark(), jobSubmitResponse.getMsg()});
                }
            };
            if (SubmitType.ASYNC.equals((Object)type)) {
                this.asyncSubmit(requestCommand, submitCallback);
            } else {
                this.syncSubmit(requestCommand, submitCallback);
            }
        }
        catch (JobTrackerNotFoundException e) {
            response.setSuccess(false);
            response.setCode("11");
            response.setMsg("Can not found JobTracker node!");
        }
        catch (Exception e) {
            response.setSuccess(false);
            response.setCode("15");
            response.setMsg(StringUtils.toString((Throwable)e));
        }
        finally {
            if (response.isSuccess()) {
                this.stat.incSubmitSuccessNum(jobs.size());
            } else {
                this.stat.incSubmitFailedNum(CollectionUtils.sizeOf(response.getFailedJobs()));
            }
        }
        return response;
    }

    private void asyncSubmit(RemotingCommand requestCommand, final SubmitCallback submitCallback) throws JobTrackerNotFoundException {
        final CountDownLatch latch = new CountDownLatch(1);
        this.remotingClient.invokeAsync(requestCommand, new AsyncCallback(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void operationComplete(ResponseFuture responseFuture) {
                try {
                    submitCallback.call(responseFuture.getResponseCommand());
                }
                finally {
                    latch.countDown();
                }
            }
        });
        try {
            latch.await(60000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new JobSubmitException("Submit job failed, async request timeout!", (Throwable)e);
        }
    }

    private void syncSubmit(RemotingCommand requestCommand, SubmitCallback submitCallback) throws JobTrackerNotFoundException {
        submitCallback.call(this.remotingClient.invokeSync(requestCommand));
    }

    public Response submitJob(List<Job> jobs) throws JobSubmitException {
        this.checkStart();
        Response response = new Response();
        response.setSuccess(true);
        int size = jobs.size();
        for (int i = 0; i <= size / 10; ++i) {
            Response subResponse;
            List subJobs = BatchUtils.getBatchList((Integer)i, (int)10, jobs);
            if (!CollectionUtils.isNotEmpty((Collection)subJobs) || (subResponse = this.protectSubmit(subJobs)).isSuccess()) continue;
            response.setSuccess(false);
            response.addFailedJobs(subJobs);
            response.setMsg(subResponse.getMsg());
        }
        return response;
    }

    protected RemotingProcessor getDefaultProcessor() {
        return new RemotingDispatcher((JobClientAppContext)this.appContext);
    }

    public void setJobCompletedHandler(JobCompletedHandler jobCompletedHandler) {
        ((JobClientAppContext)this.appContext).setJobCompletedHandler(jobCompletedHandler);
    }

    private void checkStart() {
        if (!this.started.get()) {
            throw new JobSubmitException("JobClient did not started");
        }
    }

    static enum SubmitType {
        SYNC,
        ASYNC;

    }
}

