/*
 * Decompiled with CFR 0.152.
 */
package io.edurt.datacap.server.service.impl;

import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import io.edurt.datacap.server.body.PipelineBody;
import io.edurt.datacap.server.common.BeanToPropertiesCommon;
import io.edurt.datacap.server.common.PluginCommon;
import io.edurt.datacap.server.common.Response;
import io.edurt.datacap.server.common.ServiceState;
import io.edurt.datacap.server.entity.PipelineEntity;
import io.edurt.datacap.server.entity.SourceEntity;
import io.edurt.datacap.server.plugin.configure.IConfigure;
import io.edurt.datacap.server.plugin.configure.IConfigureExecutor;
import io.edurt.datacap.server.plugin.configure.IConfigureExecutorField;
import io.edurt.datacap.server.plugin.configure.IConfigurePipelineType;
import io.edurt.datacap.server.repository.PipelineRepository;
import io.edurt.datacap.server.repository.SourceRepository;
import io.edurt.datacap.server.security.UserDetailsService;
import io.edurt.datacap.server.service.PipelineService;
import io.edurt.datacap.spi.executor.Executor;
import io.edurt.datacap.spi.executor.Pipeline;
import io.edurt.datacap.spi.executor.PipelineField;
import io.edurt.datacap.spi.executor.PipelineResponse;
import io.edurt.datacap.spi.executor.PipelineState;
import java.io.File;
import java.sql.Timestamp;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;

@Service
public class PipelineServiceImpl
implements PipelineService {
    private static final Logger log = LoggerFactory.getLogger(PipelineServiceImpl.class);
    private final ExecutorService executorService = Executors.newCachedThreadPool();
    private final SourceRepository repository;
    private final Injector injector;
    private final Environment environment;
    private final PipelineRepository pipelineRepository;

    public PipelineServiceImpl(SourceRepository repository, Injector injector, Environment environment, PipelineRepository pipelineRepository) {
        this.repository = repository;
        this.injector = injector;
        this.environment = environment;
        this.pipelineRepository = pipelineRepository;
    }

    @Override
    public Response<Object> submit(PipelineBody configure) {
        Optional fromSourceOptional = this.repository.findById(configure.getFrom().getId());
        Optional toSourceOptional = this.repository.findById(configure.getTo().getId());
        if (fromSourceOptional.isPresent() && toSourceOptional.isPresent()) {
            SourceEntity fromSource = (SourceEntity)fromSourceOptional.get();
            IConfigure fromConfigure = PluginCommon.loadYamlConfigure(fromSource.getProtocol(), fromSource.getType(), fromSource.getType(), this.environment);
            if (ObjectUtils.isEmpty(fromConfigure.getPipelines())) {
                String message = String.format("Source %s is not supported pipeline, type %s", fromSource.getId(), fromSource.getType());
                return Response.failure(ServiceState.SOURCE_NOT_SUPPORTED_PIPELINE, message);
            }
            Optional<IConfigureExecutor> fromConfigureExecutor = fromConfigure.getPipelines().stream().filter(v -> v.getExecutor().equals(configure.getExecutor()) && v.getType().equals((Object)IConfigurePipelineType.SOURCE)).findFirst();
            if (!fromConfigureExecutor.isPresent()) {
                String message = String.format("Source %s type %s is not supported pipeline type %s", new Object[]{fromSource.getId(), fromSource.getType(), IConfigurePipelineType.SOURCE});
                return Response.failure(ServiceState.SOURCE_NOT_SUPPORTED_PIPELINE_TYPE, message);
            }
            SourceEntity toSource = (SourceEntity)toSourceOptional.get();
            IConfigure toConfigure = PluginCommon.loadYamlConfigure(toSource.getProtocol(), toSource.getType(), toSource.getType(), this.environment);
            if (ObjectUtils.isEmpty(toConfigure.getPipelines())) {
                String message = String.format("Source %s is not supported pipeline, type %s", toSource.getId(), toSource.getType());
                return Response.failure(ServiceState.SOURCE_NOT_SUPPORTED_PIPELINE, message);
            }
            Optional<IConfigureExecutor> toConfigureExecutor = toConfigure.getPipelines().stream().filter(v -> v.getExecutor().equals(configure.getExecutor()) && v.getType().equals((Object)IConfigurePipelineType.SINK)).findFirst();
            if (!toConfigureExecutor.isPresent()) {
                String message = String.format("Source %s type %s is not supported pipeline type %s", new Object[]{toSource.getId(), toSource.getType(), IConfigurePipelineType.SINK});
                return Response.failure(ServiceState.SOURCE_NOT_SUPPORTED_PIPELINE_TYPE, message);
            }
            Optional<Executor> executorOptional = ((Set)this.injector.getInstance(Key.get((TypeLiteral)new TypeLiteral<Set<Executor>>(){}))).stream().filter(executor -> executor.name().equals(configure.getExecutor())).findFirst();
            Properties fromOriginProperties = configure.getFrom().getConfigures();
            fromOriginProperties.setProperty("context", configure.getContent());
            Properties fromProperties = this.merge(fromSource, fromConfigureExecutor.get().getFields(), fromOriginProperties);
            HashSet fromOptions = new HashSet();
            fromConfigureExecutor.get().getFields().stream().filter(v -> v.isRequired()).forEach(v -> fromOptions.add(v.getField()));
            PipelineField fromField = PipelineField.builder().type(fromSource.getType()).configure(fromProperties).supportOptions(fromOptions).build();
            Properties toOriginProperties = configure.getTo().getConfigures();
            Properties toProperties = this.merge(toSource, toConfigureExecutor.get().getFields(), toOriginProperties);
            HashSet toOptions = new HashSet();
            toConfigureExecutor.get().getFields().stream().filter(v -> v.isRequired()).forEach(v -> toOptions.add(v.getField()));
            PipelineField toField = PipelineField.builder().type(toSource.getType()).configure(toProperties).supportOptions(toOptions).build();
            String executorHome = this.environment.getProperty("datacap.executor.data");
            if (StringUtils.isEmpty((CharSequence)executorHome)) {
                executorHome = String.join((CharSequence)File.separator, System.getProperty("user.dir"), "data");
            }
            String username = UserDetailsService.getUser().getUsername();
            String pipelineHome = DateFormatUtils.format((long)System.currentTimeMillis(), (String)"yyyyMMddHHmmssSSS");
            String work = String.join((CharSequence)File.separator, executorHome, username, pipelineHome);
            String pipelineName = String.join((CharSequence)"_", username, configure.getExecutor().toLowerCase(), "from", String.valueOf(fromSource.getId()), "to", String.valueOf(toSource.getId()), pipelineHome);
            try {
                FileUtils.forceMkdir((File)new File(work));
            }
            catch (Exception e) {
                log.warn("Failed to create temporary directory", (Throwable)e);
            }
            Pipeline pipeline = Pipeline.builder().work(work).home(this.environment.getProperty(String.format("datacap.executor.%s.home", configure.getExecutor().toLowerCase(Locale.ROOT)))).pipelineName(pipelineName).username(UserDetailsService.getUser().getUsername()).from(fromField).to(toField).timeout(600L).build();
            PipelineEntity pipelineEntity = new PipelineEntity();
            pipelineEntity.setName(pipelineName);
            pipelineEntity.setContent(configure.getContent());
            pipelineEntity.setState(PipelineState.CREATED);
            pipelineEntity.setWork(work);
            pipelineEntity.setStartTime(new Timestamp(System.currentTimeMillis()));
            pipelineEntity.setUser(UserDetailsService.getUser());
            pipelineEntity.setFrom(fromSource);
            pipelineEntity.setFromConfigures(fromProperties);
            pipelineEntity.setTo(toSource);
            pipelineEntity.setToConfigures(toProperties);
            this.pipelineRepository.save(pipelineEntity);
            this.executorService.submit(() -> {
                pipelineEntity.setState(PipelineState.RUNNING);
                this.pipelineRepository.save(pipelineEntity);
                PipelineResponse response = ((Executor)executorOptional.get()).start(pipeline);
                pipelineEntity.setEndTime(new Timestamp(System.currentTimeMillis()));
                pipelineEntity.setState(response.getState());
                pipelineEntity.setMessage(response.getMessage());
                pipelineEntity.setElapsed(pipelineEntity.getElapsed());
                this.pipelineRepository.save(pipelineEntity);
            });
            return Response.success(pipelineEntity.getId());
        }
        return Response.failure(ServiceState.SOURCE_NOT_FOUND);
    }

    private Properties merge(SourceEntity entity, List<IConfigureExecutorField> fields, Properties configure) {
        Properties properties = new Properties();
        Properties convertBeanProperties = BeanToPropertiesCommon.convertBeanToProperties(entity);
        for (IConfigureExecutorField field : fields) {
            if (field.isOverride()) {
                this.setProperty(field, properties, configure);
                continue;
            }
            this.setProperty(field, properties, convertBeanProperties);
        }
        return properties;
    }

    private void setProperty(IConfigureExecutorField field, Properties properties, Properties configure) {
        Object value = "";
        if (ObjectUtils.isNotEmpty((Object)field.getOrigin())) {
            String[] split = String.valueOf(field.getOrigin()).split("\\|");
            if (split.length > 1) {
                value = String.join((CharSequence)":", String.valueOf(configure.get(split[0])), String.valueOf(configure.get(split[1])));
            } else if (ObjectUtils.isNotEmpty((Object)configure.get(field.getOrigin()))) {
                value = configure.get(field.getOrigin());
            }
        } else if (ObjectUtils.isNotEmpty((Object)configure.get(field.getField()))) {
            value = configure.get(field.getField());
        }
        properties.put(field.getField(), value);
    }

    @Override
    public Response<Long> delete(Long id) {
        throw new UnsupportedOperationException("Not supported yet.");
    }
}

