/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.util.parser.helper;

import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.StreamEventFactory;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.function.Script;
import io.siddhi.core.stream.StreamJunction;
import io.siddhi.core.stream.input.source.AttributeMapping;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceHandler;
import io.siddhi.core.stream.input.source.SourceHandlerManager;
import io.siddhi.core.stream.input.source.SourceMapper;
import io.siddhi.core.stream.output.sink.DynamicOptionGroupDeterminer;
import io.siddhi.core.stream.output.sink.OutputGroupDeterminer;
import io.siddhi.core.stream.output.sink.PartitionedGroupDeterminer;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.stream.output.sink.SinkHandler;
import io.siddhi.core.stream.output.sink.SinkHandlerManager;
import io.siddhi.core.stream.output.sink.SinkMapper;
import io.siddhi.core.stream.output.sink.distributed.DistributedTransport;
import io.siddhi.core.stream.output.sink.distributed.DistributionStrategy;
import io.siddhi.core.table.InMemoryTable;
import io.siddhi.core.table.Table;
import io.siddhi.core.table.record.RecordTableHandler;
import io.siddhi.core.table.record.RecordTableHandlerManager;
import io.siddhi.core.trigger.AbstractTrigger;
import io.siddhi.core.trigger.CronTrigger;
import io.siddhi.core.trigger.PeriodicTrigger;
import io.siddhi.core.trigger.StartTrigger;
import io.siddhi.core.trigger.Trigger;
import io.siddhi.core.util.ExceptionUtil;
import io.siddhi.core.util.SiddhiClassLoader;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.extension.holder.DistributionStrategyExtensionHolder;
import io.siddhi.core.util.extension.holder.ScriptExtensionHolder;
import io.siddhi.core.util.extension.holder.SinkExecutorExtensionHolder;
import io.siddhi.core.util.extension.holder.SinkMapperExecutorExtensionHolder;
import io.siddhi.core.util.extension.holder.SourceExecutorExtensionHolder;
import io.siddhi.core.util.extension.holder.SourceMapperExecutorExtensionHolder;
import io.siddhi.core.util.extension.holder.TableExtensionHolder;
import io.siddhi.core.util.transport.MultiClientDistributedSink;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.core.util.transport.SingleClientDistributedSink;
import io.siddhi.core.window.Window;
import io.siddhi.query.api.SiddhiElement;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.AggregationDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.FunctionDefinition;
import io.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.query.api.definition.TableDefinition;
import io.siddhi.query.api.definition.TriggerDefinition;
import io.siddhi.query.api.definition.WindowDefinition;
import io.siddhi.query.api.exception.DuplicateDefinitionException;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import io.siddhi.query.api.extension.Extension;
import io.siddhi.query.api.util.AnnotationHelper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.quartz.CronExpression;

public class DefinitionParserHelper {
    public static void validateDefinition(AbstractDefinition definition, ConcurrentMap<String, AbstractDefinition> streamDefinitionMap, ConcurrentMap<String, AbstractDefinition> tableDefinitionMap, ConcurrentMap<String, AbstractDefinition> windowDefinitionMap, ConcurrentMap<String, AbstractDefinition> aggregationDefinitionMap) {
        AbstractDefinition existingTableDefinition = (AbstractDefinition)tableDefinitionMap.get(definition.getId());
        if (existingTableDefinition != null && (!existingTableDefinition.equals((Object)definition) || definition instanceof StreamDefinition)) {
            throw new DuplicateDefinitionException("Table Definition with same Stream Id '" + definition.getId() + "' already exist : " + existingTableDefinition + ", hence cannot add " + definition, definition.getQueryContextStartIndex(), definition.getQueryContextEndIndex());
        }
        AbstractDefinition existingStreamDefinition = (AbstractDefinition)streamDefinitionMap.get(definition.getId());
        if (existingStreamDefinition != null && (!existingStreamDefinition.equals((Object)definition) || definition instanceof TableDefinition)) {
            throw new DuplicateDefinitionException("Stream Definition with same Stream Id '" + definition.getId() + "' already exist : " + existingStreamDefinition + ", hence cannot add " + definition, definition.getQueryContextStartIndex(), definition.getQueryContextEndIndex());
        }
        AbstractDefinition existingWindowDefinition = (AbstractDefinition)windowDefinitionMap.get(definition.getId());
        if (existingWindowDefinition != null && (!existingWindowDefinition.equals((Object)definition) || definition instanceof WindowDefinition)) {
            throw new DuplicateDefinitionException("Window Definition with same Window Id '" + definition.getId() + "' already exist : " + existingWindowDefinition + ", hence cannot add " + definition, definition.getQueryContextStartIndex(), definition.getQueryContextEndIndex());
        }
        AbstractDefinition existingAggregationDefinition = (AbstractDefinition)aggregationDefinitionMap.get(definition.getId());
        if (existingAggregationDefinition != null && (!existingAggregationDefinition.equals((Object)definition) || definition instanceof AggregationDefinition)) {
            throw new DuplicateDefinitionException("Aggregation Definition with same Aggregation Id '" + definition.getId() + "' already exist : " + existingWindowDefinition + ", hence cannot add " + definition, definition.getQueryContextStartIndex(), definition.getQueryContextEndIndex());
        }
    }

    public static void addStreamJunction(StreamDefinition streamDefinition, ConcurrentMap<String, StreamJunction> streamJunctionMap, SiddhiAppContext siddhiAppContext) {
        if (!streamJunctionMap.containsKey(streamDefinition.getId())) {
            StreamJunction faultStreamJunction = (StreamJunction)streamJunctionMap.get("!".concat(streamDefinition.getId()));
            StreamJunction streamJunction = new StreamJunction(streamDefinition, siddhiAppContext.getExecutorService(), siddhiAppContext.getBufferSize(), faultStreamJunction, siddhiAppContext);
            streamJunctionMap.putIfAbsent(streamDefinition.getId(), streamJunction);
        }
    }

    public static void validateOutputStream(StreamDefinition outputStreamDefinition, AbstractDefinition existingStream) {
        if (!existingStream.equalsIgnoreAnnotations((Object)outputStreamDefinition)) {
            throw new DuplicateDefinitionException("Different definition same as output '" + outputStreamDefinition + "' already exist as '" + existingStream + "'", outputStreamDefinition.getQueryContextStartIndex(), outputStreamDefinition.getQueryContextEndIndex());
        }
    }

    public static void addTable(TableDefinition tableDefinition, ConcurrentMap<String, Table> tableMap, SiddhiAppContext siddhiAppContext) {
        if (!tableMap.containsKey(tableDefinition.getId())) {
            Table table;
            MetaStreamEvent tableMetaStreamEvent = new MetaStreamEvent();
            tableMetaStreamEvent.addInputDefinition((AbstractDefinition)tableDefinition);
            for (Attribute attribute : tableDefinition.getAttributeList()) {
                tableMetaStreamEvent.addOutputData(attribute);
            }
            StreamEventFactory tableStreamEventFactory = new StreamEventFactory(tableMetaStreamEvent);
            StreamEventCloner tableStreamEventCloner = new StreamEventCloner(tableMetaStreamEvent, tableStreamEventFactory);
            Annotation annotation = AnnotationHelper.getAnnotation((String)"Store", (List)tableDefinition.getAnnotations());
            ConfigReader configReader = null;
            RecordTableHandlerManager recordTableHandlerManager = null;
            RecordTableHandler recordTableHandler = null;
            if (annotation != null) {
                final String tableType = (annotation = DefinitionParserHelper.updateAnnotationRef(annotation, "store", siddhiAppContext)).getElement("type");
                if (tableType == null) {
                    throw new SiddhiAppCreationException("Attribute 'type' does not exist for annotation '" + annotation + "'", (SiddhiElement)annotation, siddhiAppContext);
                }
                Extension extension = new Extension(){

                    public String getNamespace() {
                        return "store";
                    }

                    public String getName() {
                        return tableType;
                    }
                };
                recordTableHandlerManager = siddhiAppContext.getSiddhiContext().getRecordTableHandlerManager();
                if (recordTableHandlerManager != null) {
                    recordTableHandler = recordTableHandlerManager.generateRecordTableHandler();
                }
                table = (Table)SiddhiClassLoader.loadExtensionImplementation(extension, TableExtensionHolder.getInstance(siddhiAppContext));
                configReader = siddhiAppContext.getSiddhiContext().getConfigManager().generateConfigReader(extension.getNamespace(), extension.getName());
            } else {
                table = new InMemoryTable();
            }
            table.initTable(tableDefinition, tableStreamEventFactory, tableStreamEventCloner, configReader, siddhiAppContext, recordTableHandler);
            if (recordTableHandler != null) {
                recordTableHandlerManager.registerRecordTableHandler(recordTableHandler.getId(), recordTableHandler);
            }
            tableMap.putIfAbsent(tableDefinition.getId(), table);
        }
    }

    public static void addWindow(WindowDefinition windowDefinition, ConcurrentMap<String, Window> eventWindowMap, SiddhiAppContext siddhiAppContext) {
        if (!eventWindowMap.containsKey(windowDefinition.getId())) {
            Window window = new Window(windowDefinition, siddhiAppContext);
            eventWindowMap.putIfAbsent(windowDefinition.getId(), window);
        }
    }

    public static void addFunction(SiddhiAppContext siddhiAppContext, final FunctionDefinition functionDefinition) {
        Extension extension = new Extension(){

            public String getNamespace() {
                return "script";
            }

            public String getName() {
                return functionDefinition.getLanguage().toLowerCase();
            }
        };
        try {
            Script script = (Script)SiddhiClassLoader.loadExtensionImplementation(extension, ScriptExtensionHolder.getInstance(siddhiAppContext));
            ConfigReader configReader = siddhiAppContext.getSiddhiContext().getConfigManager().generateConfigReader(extension.getNamespace(), extension.getName());
            script.setReturnType(functionDefinition.getReturnType());
            script.init(functionDefinition.getId(), functionDefinition.getBody(), configReader);
            siddhiAppContext.getScriptFunctionMap().put(functionDefinition.getId(), script);
        }
        catch (Throwable t) {
            ExceptionUtil.populateQueryContext(t, (SiddhiElement)functionDefinition, siddhiAppContext);
            throw t;
        }
    }

    public static void validateDefinition(TriggerDefinition triggerDefinition) {
        if (triggerDefinition.getId() != null) {
            if (triggerDefinition.getAtEvery() == null) {
                String expression = triggerDefinition.getAt();
                if (expression == null) {
                    throw new SiddhiAppValidationException("Trigger Definition '" + triggerDefinition.getId() + "' must have trigger time defined");
                }
                if (!expression.trim().equalsIgnoreCase("start")) {
                    try {
                        CronExpression.isValidExpression((String)expression);
                    }
                    catch (Throwable t) {
                        throw new SiddhiAppValidationException("Trigger Definition '" + triggerDefinition.getId() + "' have invalid trigger time defined, expected 'start' or valid cron but found '" + expression + "'");
                    }
                }
            } else if (triggerDefinition.getAt() != null) {
                throw new SiddhiAppValidationException("Trigger Definition '" + triggerDefinition.getId() + "' must either have trigger time in cron or 'start' or time interval defined, and it cannot have more than one defined as '" + triggerDefinition + "'");
            }
        } else {
            throw new SiddhiAppValidationException("Trigger Definition id cannot be null");
        }
    }

    public static void addEventTrigger(TriggerDefinition triggerDefinition, ConcurrentMap<String, Trigger> eventTriggerMap, ConcurrentMap<String, StreamJunction> streamJunctionMap, SiddhiAppContext siddhiAppContext) {
        if (!eventTriggerMap.containsKey(triggerDefinition.getId())) {
            AbstractTrigger trigger = triggerDefinition.getAtEvery() != null ? new PeriodicTrigger() : (triggerDefinition.getAt().trim().equalsIgnoreCase("start") ? new StartTrigger() : new CronTrigger());
            StreamJunction streamJunction = (StreamJunction)streamJunctionMap.get(triggerDefinition.getId());
            trigger.init(triggerDefinition, siddhiAppContext, streamJunction);
            siddhiAppContext.addEternalReferencedHolder(trigger);
            eventTriggerMap.putIfAbsent(trigger.getId(), trigger);
        }
    }

    public static void addEventSource(StreamDefinition streamDefinition, ConcurrentMap<String, List<Source>> eventSourceMap, SiddhiAppContext siddhiAppContext) {
        for (Annotation sourceAnnotation : streamDefinition.getAnnotations()) {
            if (!"Source".equalsIgnoreCase(sourceAnnotation.getName())) continue;
            try {
                ArrayList<Source> eventSources;
                String sourceType;
                sourceAnnotation = DefinitionParserHelper.updateAnnotationRef(sourceAnnotation, "source", siddhiAppContext);
                Annotation mapAnnotation = AnnotationHelper.getAnnotation((String)"Map", (List)sourceAnnotation.getAnnotations());
                if (mapAnnotation == null) {
                    mapAnnotation = Annotation.annotation((String)"Map").element("type", "passThrough");
                }
                if ((sourceType = sourceAnnotation.getElement("type")) == null) {
                    throw new SiddhiAppCreationException("Attribute 'type' does not exist for annotation '" + sourceAnnotation + "'", (SiddhiElement)sourceAnnotation, siddhiAppContext);
                }
                String mapType = mapAnnotation.getElement("type");
                if (mapType == null) {
                    throw new SiddhiAppCreationException("Attribute 'type' does not exist for annotation '" + mapAnnotation + "'", (SiddhiElement)mapAnnotation, siddhiAppContext);
                }
                SourceHandlerManager sourceHandlerManager = siddhiAppContext.getSiddhiContext().getSourceHandlerManager();
                SourceHandler sourceHandler = null;
                if (sourceHandlerManager != null) {
                    sourceHandler = sourceHandlerManager.generateSourceHandler(sourceType);
                }
                Extension sourceExtension = DefinitionParserHelper.constructExtension(streamDefinition, "Source", sourceType, sourceAnnotation, "source");
                Source source = (Source)SiddhiClassLoader.loadExtensionImplementation(sourceExtension, SourceExecutorExtensionHolder.getInstance(siddhiAppContext));
                ConfigReader configReader = siddhiAppContext.getSiddhiContext().getConfigManager().generateConfigReader(sourceExtension.getNamespace(), sourceExtension.getName());
                Extension mapperExtension = DefinitionParserHelper.constructExtension(streamDefinition, "Map", mapType, sourceAnnotation, "sourceMapper");
                SourceMapper sourceMapper = (SourceMapper)SiddhiClassLoader.loadExtensionImplementation(mapperExtension, SourceMapperExecutorExtensionHolder.getInstance(siddhiAppContext));
                ConfigReader mapperConfigReader = siddhiAppContext.getSiddhiContext().getConfigManager().generateConfigReader(mapperExtension.getNamespace(), mapperExtension.getName());
                DefinitionParserHelper.validateSourceMapperCompatibility(streamDefinition, sourceType, mapType, source, sourceMapper, sourceAnnotation);
                io.siddhi.annotation.Extension sourceExt = source.getClass().getAnnotation(io.siddhi.annotation.Extension.class);
                OptionHolder sourceOptionHolder = DefinitionParserHelper.constructOptionHolder(streamDefinition, sourceAnnotation, sourceExt, null, true);
                Map<String, String> deploymentProperties = DefinitionParserHelper.createDeploymentProperties(sourceAnnotation, sourceExt);
                OptionHolder mapOptionHolder = DefinitionParserHelper.constructOptionHolder(streamDefinition, mapAnnotation, sourceMapper.getClass().getAnnotation(io.siddhi.annotation.Extension.class), null, false);
                AttributesHolder attributesHolder = DefinitionParserHelper.getAttributeMappings(mapAnnotation, mapType, streamDefinition);
                String[] transportPropertyNames = DefinitionParserHelper.getTransportPropertyNames(attributesHolder);
                source.init(sourceType, sourceOptionHolder, sourceMapper, transportPropertyNames, configReader, mapType, mapOptionHolder, attributesHolder.payloadMappings, attributesHolder.transportMappings, mapperConfigReader, sourceHandler, streamDefinition, deploymentProperties, siddhiAppContext);
                if (sourceHandlerManager != null) {
                    sourceHandlerManager.registerSourceHandler(sourceHandler.getId(), sourceHandler);
                }
                if ((eventSources = (ArrayList<Source>)eventSourceMap.get(streamDefinition.getId())) == null) {
                    eventSources = new ArrayList<Source>();
                    eventSources.add(source);
                    eventSourceMap.put(streamDefinition.getId(), eventSources);
                    continue;
                }
                eventSources.add(source);
            }
            catch (Throwable t) {
                ExceptionUtil.populateQueryContext(t, (SiddhiElement)sourceAnnotation, siddhiAppContext);
                throw t;
            }
        }
    }

    private static void validateSourceMapperCompatibility(StreamDefinition streamDefinition, String sourceType, String mapType, Source source, SourceMapper sourceMapper, Annotation sourceAnnotation) {
        Object[] inputEventClasses = sourceMapper.getSupportedInputEventClasses();
        Object[] outputEventClasses = source.getOutputEventClasses();
        if (outputEventClasses == null || outputEventClasses.length == 0) {
            return;
        }
        boolean matchingSinkAndMapperClasses = false;
        for (Class clazz : inputEventClasses) {
            for (Class clazz2 : outputEventClasses) {
                if (!clazz.isAssignableFrom(clazz2)) continue;
                matchingSinkAndMapperClasses = true;
                break;
            }
            if (matchingSinkAndMapperClasses) break;
        }
        if (!matchingSinkAndMapperClasses) {
            throw new SiddhiAppCreationException("At stream '" + streamDefinition.getId() + "', source '" + sourceType + "' produces incompatible '" + Arrays.deepToString(outputEventClasses) + "' classes, while it's source mapper '" + mapType + "' can only consume '" + Arrays.deepToString(inputEventClasses) + "' classes.", sourceAnnotation.getQueryContextStartIndex(), sourceAnnotation.getQueryContextEndIndex());
        }
    }

    private static String[] getTransportPropertyNames(AttributesHolder attributesHolder) {
        ArrayList<String> attributeNames = new ArrayList<String>();
        for (AttributeMapping attributeMapping : attributesHolder.transportMappings) {
            attributeNames.add(attributeMapping.getMapping());
        }
        return attributeNames.toArray(new String[0]);
    }

    public static void addEventSink(StreamDefinition streamDefinition, ConcurrentMap<String, List<Sink>> eventSinkMap, SiddhiAppContext siddhiAppContext) {
        for (Annotation sinkAnnotation : streamDefinition.getAnnotations()) {
            if (!"Sink".equalsIgnoreCase(sinkAnnotation.getName())) continue;
            try {
                sinkAnnotation = DefinitionParserHelper.updateAnnotationRef(sinkAnnotation, "sink", siddhiAppContext);
                Annotation mapAnnotation = AnnotationHelper.getAnnotation((String)"Map", (List)sinkAnnotation.getAnnotations());
                String sinkType = sinkAnnotation.getElement("type");
                if (sinkType == null) {
                    throw new SiddhiAppCreationException("Attribute 'type' does not exist for annotation '" + sinkAnnotation + "'", (SiddhiElement)sinkAnnotation, siddhiAppContext);
                }
                if (mapAnnotation == null) {
                    mapAnnotation = Annotation.annotation((String)"Map").element("type", "passThrough");
                }
                Annotation distributionAnnotation = AnnotationHelper.getAnnotation((String)"Distribution", (List)sinkAnnotation.getAnnotations());
                if (mapAnnotation != null) {
                    String mapType;
                    String[] supportedDynamicOptions = null;
                    List<Object> destinationOptHolders = new ArrayList();
                    ArrayList<Map<String, String>> destinationDeploymentProperties = new ArrayList();
                    Extension sinkExtension = DefinitionParserHelper.constructExtension(streamDefinition, "Sink", sinkType, sinkAnnotation, "sink");
                    ConfigReader sinkConfigReader = siddhiAppContext.getSiddhiContext().getConfigManager().generateConfigReader(sinkExtension.getNamespace(), sinkExtension.getName());
                    boolean isDistributedTransport = distributionAnnotation != null;
                    boolean isMultiClient = false;
                    if (isDistributedTransport) {
                        Sink sink = DefinitionParserHelper.createSink(sinkExtension, siddhiAppContext);
                        isMultiClient = DefinitionParserHelper.isMultiClientDistributedTransport(sink, streamDefinition, distributionAnnotation, siddhiAppContext);
                        supportedDynamicOptions = sink.getSupportedDynamicOptions();
                        destinationOptHolders = DefinitionParserHelper.createDestinationOptionHolders(distributionAnnotation, streamDefinition, sink, siddhiAppContext);
                        destinationDeploymentProperties = DefinitionParserHelper.createDestinationDeploymentProperties(distributionAnnotation, sink);
                    }
                    if ((mapType = mapAnnotation.getElement("type")) != null) {
                        ArrayList<Sink> eventSinks;
                        Sink sink = isDistributedTransport ? (isMultiClient ? new MultiClientDistributedSink() : new SingleClientDistributedSink()) : DefinitionParserHelper.createSink(sinkExtension, siddhiAppContext);
                        if (supportedDynamicOptions == null) {
                            supportedDynamicOptions = sink.getSupportedDynamicOptions();
                        }
                        Extension mapperExtension = DefinitionParserHelper.constructExtension(streamDefinition, "Map", mapType, sinkAnnotation, "sinkMapper");
                        ConfigReader mapperConfigReader = siddhiAppContext.getSiddhiContext().getConfigManager().generateConfigReader(sinkExtension.getNamespace(), sinkExtension.getName());
                        SinkMapper sinkMapper = (SinkMapper)SiddhiClassLoader.loadExtensionImplementation(mapperExtension, SinkMapperExecutorExtensionHolder.getInstance(siddhiAppContext));
                        io.siddhi.annotation.Extension sinkExt = sink.getClass().getAnnotation(io.siddhi.annotation.Extension.class);
                        OptionHolder transportOptionHolder = DefinitionParserHelper.constructOptionHolder(streamDefinition, sinkAnnotation, sinkExt, supportedDynamicOptions, true);
                        Map<String, String> deploymentProperties = DefinitionParserHelper.createDeploymentProperties(sinkAnnotation, sinkExt);
                        OptionHolder mapOptionHolder = DefinitionParserHelper.constructOptionHolder(streamDefinition, mapAnnotation, sinkMapper.getClass().getAnnotation(io.siddhi.annotation.Extension.class), sinkMapper.getSupportedDynamicOptions(), false);
                        List<Element> payloadElementList = DefinitionParserHelper.getPayload(mapAnnotation);
                        OptionHolder distributionOptHolder = null;
                        SinkHandlerManager sinkHandlerManager = siddhiAppContext.getSiddhiContext().getSinkHandlerManager();
                        SinkHandler sinkHandler = null;
                        if (sinkHandlerManager != null) {
                            sinkHandler = sinkHandlerManager.generateSinkHandler();
                        }
                        if (isDistributedTransport) {
                            distributionOptHolder = DefinitionParserHelper.constructOptionHolder(streamDefinition, distributionAnnotation, sinkExt, supportedDynamicOptions, true);
                            String strategyType = distributionOptHolder.validateAndGetStaticValue("strategy");
                            Extension strategyExtension = DefinitionParserHelper.constructExtension(streamDefinition, "Sink", strategyType, sinkAnnotation, "distributionStrategy");
                            ConfigReader configReader = siddhiAppContext.getSiddhiContext().getConfigManager().generateConfigReader(strategyExtension.getNamespace(), strategyExtension.getName());
                            DistributionStrategy distributionStrategy = (DistributionStrategy)SiddhiClassLoader.loadExtensionImplementation(strategyExtension, DistributionStrategyExtensionHolder.getInstance(siddhiAppContext));
                            distributionStrategy.init(streamDefinition, transportOptionHolder, distributionOptHolder, destinationOptHolders, configReader);
                            ((DistributedTransport)sink).init(streamDefinition, sinkType, transportOptionHolder, sinkConfigReader, sinkMapper, mapType, mapOptionHolder, sinkHandler, payloadElementList, mapperConfigReader, siddhiAppContext, destinationOptHolders, sinkAnnotation, distributionStrategy, supportedDynamicOptions, deploymentProperties, destinationDeploymentProperties);
                        } else {
                            sink.init(streamDefinition, sinkType, transportOptionHolder, sinkConfigReader, sinkMapper, mapType, mapOptionHolder, sinkHandler, payloadElementList, mapperConfigReader, deploymentProperties, siddhiAppContext);
                        }
                        if (sinkHandlerManager != null) {
                            sinkHandlerManager.registerSinkHandler(sinkHandler.getId(), sinkHandler);
                        }
                        DefinitionParserHelper.validateSinkMapperCompatibility(streamDefinition, sinkType, mapType, sink, sinkMapper, sinkAnnotation);
                        OutputGroupDeterminer groupDeterminer = DefinitionParserHelper.constructOutputGroupDeterminer(transportOptionHolder, distributionOptHolder, streamDefinition, destinationOptHolders.size());
                        if (groupDeterminer != null) {
                            sink.getMapper().setGroupDeterminer(groupDeterminer);
                        }
                        if ((eventSinks = (ArrayList<Sink>)eventSinkMap.get(streamDefinition.getId())) == null) {
                            eventSinks = new ArrayList<Sink>();
                            eventSinks.add(sink);
                            eventSinkMap.put(streamDefinition.getId(), eventSinks);
                            continue;
                        }
                        eventSinks.add(sink);
                        continue;
                    }
                    throw new SiddhiAppCreationException("Attribute 'type' does not exist for annotation '" + mapAnnotation + "'", (SiddhiElement)mapAnnotation, siddhiAppContext);
                }
                throw new SiddhiAppCreationException("Both @sink(type=) and @map(type=) are required.", sinkAnnotation.getQueryContextStartIndex(), sinkAnnotation.getQueryContextEndIndex());
            }
            catch (Throwable t) {
                ExceptionUtil.populateQueryContext(t, (SiddhiElement)sinkAnnotation, siddhiAppContext);
                throw t;
            }
        }
    }

    private static void validateSinkMapperCompatibility(StreamDefinition streamDefinition, String sinkType, String mapType, Sink sink, SinkMapper sinkMapper, Annotation sinkAnnotation) {
        Object[] inputEventClasses = sink.getSupportedInputEventClasses();
        Object[] outputEventClasses = sinkMapper.getOutputEventClasses();
        if (outputEventClasses == null || outputEventClasses.length == 0) {
            return;
        }
        boolean matchingSinkAndMapperClasses = false;
        for (Class clazz : inputEventClasses) {
            for (Class clazz2 : outputEventClasses) {
                if (!clazz.isAssignableFrom(clazz2)) continue;
                matchingSinkAndMapperClasses = true;
                break;
            }
            if (matchingSinkAndMapperClasses) break;
        }
        if (!matchingSinkAndMapperClasses) {
            throw new SiddhiAppCreationException("At stream '" + streamDefinition.getId() + "', sink mapper '" + mapType + "' processes '" + Arrays.deepToString(outputEventClasses) + "' classes but it's sink '" + sinkType + "' cannot not consume any of those class, where sink can only consume '" + Arrays.deepToString(inputEventClasses) + "' classes.", sinkAnnotation.getQueryContextStartIndex(), sinkAnnotation.getQueryContextEndIndex());
        }
    }

    private static OutputGroupDeterminer constructOutputGroupDeterminer(OptionHolder transportOptHolder, OptionHolder distributedOptHolder, StreamDefinition streamDef, int destinationCount) {
        String strategy;
        OutputGroupDeterminer groupDeterminer = null;
        if (distributedOptHolder != null && (strategy = distributedOptHolder.validateAndGetStaticValue("strategy")).equalsIgnoreCase("partitioned")) {
            String partitionKeyField = distributedOptHolder.validateAndGetStaticValue("partitionKey");
            int partitioningFieldIndex = streamDef.getAttributePosition(partitionKeyField);
            groupDeterminer = new PartitionedGroupDeterminer(partitioningFieldIndex, destinationCount);
        }
        if (groupDeterminer == null) {
            ArrayList<Option> dynamicTransportOptions = new ArrayList<Option>(transportOptHolder.getDynamicOptionsKeys().size());
            for (String option : transportOptHolder.getDynamicOptionsKeys()) {
                dynamicTransportOptions.add(transportOptHolder.validateAndGetOption(option));
            }
            if (dynamicTransportOptions.size() > 0) {
                groupDeterminer = new DynamicOptionGroupDeterminer(dynamicTransportOptions);
            }
        }
        return groupDeterminer;
    }

    public static Extension constructExtension(StreamDefinition streamDefinition, String typeName, String typeValue, Annotation annotation, String defaultNamespace) {
        String name;
        String namespace;
        String[] namespaceAndName = typeValue.split(":");
        if (namespaceAndName.length == 1) {
            namespace = defaultNamespace;
            name = namespaceAndName[0];
        } else if (namespaceAndName.length == 2) {
            namespace = namespaceAndName[0];
            name = namespaceAndName[1];
        } else {
            throw new SiddhiAppCreationException("Malformed '" + typeName + "' annotation type '" + typeValue + "' provided, for annotation '" + annotation + "' on stream '" + streamDefinition.getId() + "', it should be either '<namespace>:<name>' or '<name>'", annotation.getQueryContextStartIndex(), annotation.getQueryContextEndIndex());
        }
        return new Extension(){

            public String getNamespace() {
                return namespace;
            }

            public String getName() {
                return name;
            }
        };
    }

    private static AttributesHolder getAttributeMappings(Annotation mapAnnotation, String mapType, StreamDefinition streamDefinition) {
        AttributesHolder attributesHolder;
        block11: {
            List attributeList;
            List attributeAnnotations = mapAnnotation.getAnnotations("Attributes");
            attributesHolder = new AttributesHolder();
            if (attributeAnnotations.size() <= 0) break block11;
            HashMap<String, String> elementMap = new HashMap<String, String>();
            ArrayList<String> elementList = new ArrayList<String>();
            Boolean attributesNameDefined = null;
            for (Element element : ((Annotation)attributeAnnotations.get(0)).getElements()) {
                if (element.getKey() == null) {
                    if (attributesNameDefined != null && attributesNameDefined.booleanValue()) {
                        throw new SiddhiAppCreationException("Error at '" + mapType + "' defined at stream'" + streamDefinition.getId() + "', some attributes are defined and some are not defined.", element.getQueryContextStartIndex(), element.getQueryContextEndIndex());
                    }
                    attributesNameDefined = false;
                    elementList.add(element.getValue());
                    continue;
                }
                if (attributesNameDefined != null && !attributesNameDefined.booleanValue()) {
                    throw new SiddhiAppCreationException("Error at '" + mapType + "' defined at stream '" + streamDefinition.getId() + "', some attributes are defined and some are not defined.", element.getQueryContextStartIndex(), element.getQueryContextEndIndex());
                }
                attributesNameDefined = true;
                elementMap.put(element.getKey(), element.getValue());
            }
            if (elementMap.size() > 0) {
                attributeList = streamDefinition.getAttributeList();
                int attributeListSize = attributeList.size();
                for (int i = 0; i < attributeListSize; ++i) {
                    Attribute attribute = (Attribute)attributeList.get(i);
                    String value = (String)elementMap.get(attribute.getName());
                    if (value == null) {
                        throw new SiddhiAppCreationException("Error at '" + mapType + "' defined at stream '" + streamDefinition.getId() + "', attribute '" + attribute.getName() + "' is not mapped.", mapAnnotation.getQueryContextStartIndex(), mapAnnotation.getQueryContextEndIndex());
                    }
                    DefinitionParserHelper.assignMapping(attributesHolder, elementMap, i, attribute);
                }
            } else {
                Attribute attribute;
                int i;
                attributeList = streamDefinition.getAttributeList();
                if (elementList.size() != attributeList.size()) {
                    throw new SiddhiAppCreationException("Error at '" + mapType + "' defined at stream '" + streamDefinition.getId() + "', '" + elementList.size() + "' mapping attributes are provided but expected attributes are '" + attributeList.size() + "'.", mapAnnotation.getQueryContextStartIndex(), mapAnnotation.getQueryContextEndIndex());
                }
                for (i = 0; i < attributeList.size(); ++i) {
                    attribute = (Attribute)attributeList.get(i);
                    String value = (String)elementList.get(i);
                    elementMap.put(attribute.getName(), value);
                }
                for (i = 0; i < attributeList.size(); ++i) {
                    attribute = (Attribute)attributeList.get(i);
                    DefinitionParserHelper.assignMapping(attributesHolder, elementMap, i, attribute);
                }
            }
        }
        return attributesHolder;
    }

    private static void assignMapping(AttributesHolder attributesHolder, Map<String, String> elementMap, int i, Attribute attribute) {
        String mapping = elementMap.get(attribute.getName()).trim();
        if (mapping.startsWith("trp:")) {
            attributesHolder.transportMappings.add(new AttributeMapping(attribute.getName(), i, mapping.substring(4)));
        } else {
            attributesHolder.payloadMappings.add(new AttributeMapping(attribute.getName(), i, mapping));
        }
    }

    private static List<Element> getPayload(Annotation mapAnnotation) {
        List attributeAnnotations = mapAnnotation.getAnnotations("Payload");
        if (attributeAnnotations.size() == 1) {
            List elements = ((Annotation)attributeAnnotations.get(0)).getElements();
            return elements;
        }
        if (attributeAnnotations.size() > 1) {
            throw new SiddhiAppCreationException("@map() annotation should only contain single @payload() annotation.", mapAnnotation.getQueryContextStartIndex(), mapAnnotation.getQueryContextEndIndex());
        }
        return null;
    }

    private static OptionHolder constructOptionHolder(StreamDefinition streamDefinition, Annotation annotation, io.siddhi.annotation.Extension extension, String[] supportedDynamicOptions, boolean supportDeploymentOptions) {
        List<Object> supportedDynamicOptionList = new ArrayList();
        if (supportedDynamicOptions != null) {
            supportedDynamicOptionList = Arrays.asList(supportedDynamicOptions);
        }
        HashMap<String, String> options = new HashMap<String, String>();
        HashMap<String, String> dynamicOptions = new HashMap<String, String>();
        for (Element element : annotation.getElements()) {
            if (element.getKey().startsWith("dep:") && !supportDeploymentOptions) {
                throw new SiddhiAppCreationException("DeploymentOption is not supported by '" + extension.namespace() + ":" + extension.name() + "', but a deployment property '" + element.getKey() + "' is configured", annotation.getQueryContextStartIndex(), annotation.getQueryContextEndIndex());
            }
            if (Pattern.matches("(.*?)\\{\\{.*?\\}\\}(.*?)", element.getValue())) {
                if (supportedDynamicOptionList.contains(element.getKey())) {
                    dynamicOptions.put(element.getKey(), element.getValue());
                    continue;
                }
                throw new SiddhiAppCreationException("'" + element.getKey() + "' is not a supported DynamicOption for the Extension '" + extension.namespace() + ":" + extension.name() + "', it only supports following as its DynamicOptions: " + supportedDynamicOptionList, annotation.getQueryContextStartIndex(), annotation.getQueryContextEndIndex());
            }
            options.put(element.getKey(), element.getValue());
        }
        return new OptionHolder(streamDefinition, options, dynamicOptions, extension);
    }

    private static boolean isMultiClientDistributedTransport(Sink clientTransport, StreamDefinition streamDefinition, Annotation distributionAnnotation, SiddhiAppContext siddhiAppContext) {
        List<OptionHolder> destinationOptHolders = DefinitionParserHelper.createDestinationOptionHolders(distributionAnnotation, streamDefinition, clientTransport, siddhiAppContext);
        List<String> dynamicOptions = Arrays.asList(clientTransport.getSupportedDynamicOptions());
        for (OptionHolder optionHolder : destinationOptHolders) {
            for (String key : optionHolder.getDynamicOptionsKeys()) {
                if (dynamicOptions.contains(key)) continue;
                return true;
            }
            for (String key : optionHolder.getStaticOptionsKeys()) {
                if (dynamicOptions.contains(key)) continue;
                return true;
            }
        }
        return false;
    }

    private static Sink createSink(Extension sinkExtension, SiddhiAppContext siddhiAppContext) {
        return (Sink)SiddhiClassLoader.loadExtensionImplementation(sinkExtension, SinkExecutorExtensionHolder.getInstance(siddhiAppContext));
    }

    private static List<OptionHolder> createDestinationOptionHolders(Annotation distributionAnnotation, StreamDefinition streamDefinition, Sink clientTransport, SiddhiAppContext siddhiAppContext) {
        io.siddhi.annotation.Extension sinkExt = clientTransport.getClass().getAnnotation(io.siddhi.annotation.Extension.class);
        ArrayList<OptionHolder> destinationOptHolders = new ArrayList<OptionHolder>();
        distributionAnnotation.getAnnotations().stream().filter(annotation -> annotation.getName().equalsIgnoreCase("Destination")).forEach(destinationAnnotation -> destinationOptHolders.add(DefinitionParserHelper.constructOptionHolder(streamDefinition, DefinitionParserHelper.updateAnnotationRef(destinationAnnotation, "Destination", siddhiAppContext), sinkExt, clientTransport.getSupportedDynamicOptions(), true)));
        return destinationOptHolders;
    }

    private static Map<String, String> createDeploymentProperties(Annotation annotation, io.siddhi.annotation.Extension extension) {
        HashMap<String, String> deploymentOptions = new HashMap<String, String>();
        for (Element element : annotation.getElements()) {
            if (!element.getKey().startsWith("dep:")) continue;
            if (Pattern.matches("(.*?)\\{\\{.*?\\}\\}(.*?)", element.getValue())) {
                throw new SiddhiAppCreationException("DeploymentOption cannot have dynamic parameters, but '" + element.getKey() + "' of '" + extension.namespace() + ":" + extension.name() + "' is configured with '" + element.getValue() + "'", annotation.getQueryContextStartIndex(), annotation.getQueryContextEndIndex());
            }
            deploymentOptions.put(element.getKey().substring(4), element.getValue());
        }
        return deploymentOptions;
    }

    private static List<Map<String, String>> createDestinationDeploymentProperties(Annotation distributionAnnotation, Sink sink) {
        io.siddhi.annotation.Extension sinkExt = sink.getClass().getAnnotation(io.siddhi.annotation.Extension.class);
        ArrayList<Map<String, String>> destinationDeploymentProperties = new ArrayList<Map<String, String>>();
        distributionAnnotation.getAnnotations().stream().filter(annotation -> annotation.getName().equalsIgnoreCase("Destination")).forEach(destinationAnnotation -> destinationDeploymentProperties.add(DefinitionParserHelper.createDeploymentProperties(destinationAnnotation, sinkExt)));
        return destinationDeploymentProperties;
    }

    private static Annotation updateAnnotationRef(Annotation annotation, String type, SiddhiAppContext siddhiAppContext) {
        String ref = annotation.getElement("ref");
        if (ref != null) {
            Map<String, String> systemConfigs = siddhiAppContext.getSiddhiContext().getConfigManager().extractSystemConfigs(ref);
            if (systemConfigs.size() == 0) {
                throw new SiddhiAppCreationException("The " + type + " element of the name '" + ref + "' is not defined in the configurations file.", annotation.getQueryContextStartIndex(), annotation.getQueryContextEndIndex());
            }
            HashMap<String, String> newSystemConfig = new HashMap<String, String>(systemConfigs);
            Map<String, String> collection = annotation.getElements().stream().collect(Collectors.toMap(Element::getKey, Element::getValue));
            collection.remove("ref");
            newSystemConfig.putAll(collection);
            List annotationElements = newSystemConfig.entrySet().stream().map(property -> new Element((String)property.getKey(), (String)property.getValue())).collect(Collectors.toList());
            annotation.setElements(annotationElements);
        }
        return annotation;
    }

    static class AttributesHolder {
        List<AttributeMapping> transportMappings = new ArrayList<AttributeMapping>();
        List<AttributeMapping> payloadMappings = new ArrayList<AttributeMapping>();

        AttributesHolder() {
        }
    }
}

