/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.platform.mqueues;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.platform.mqueues.ConfigDescriptor;
import org.nuxeo.ecm.platform.mqueues.MQService;
import org.nuxeo.ecm.platform.mqueues.TopologyDescriptor;
import org.nuxeo.ecm.platform.mqueues.kafka.KafkaConfigService;
import org.nuxeo.lib.core.mqueues.computation.ComputationManager;
import org.nuxeo.lib.core.mqueues.computation.Settings;
import org.nuxeo.lib.core.mqueues.computation.Topology;
import org.nuxeo.lib.core.mqueues.computation.mqueue.MQComputationManager;
import org.nuxeo.lib.core.mqueues.mqueues.MQManager;
import org.nuxeo.lib.core.mqueues.mqueues.chronicle.ChronicleMQManager;
import org.nuxeo.lib.core.mqueues.mqueues.kafka.KafkaMQManager;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.model.ComponentContext;
import org.nuxeo.runtime.model.ComponentInstance;
import org.nuxeo.runtime.model.ComponentManager;
import org.nuxeo.runtime.model.DefaultComponent;

public class MQServiceImpl
extends DefaultComponent
implements MQService {
    private static final Log log = LogFactory.getLog(MQServiceImpl.class);
    public static final String NUXEO_MQUEUE_DIR_PROP = "nuxeo.mqueue.chronicle.dir";
    public static final String NUXEO_MQUEUE_RET_DURATION_PROP = "nuxeo.mqueue.chronicle.retention.duration";
    protected static final String MQ_CONFIG_XP = "config";
    protected static final String MQ_TOPOLOGY_XP = "topology";
    protected Map<String, ConfigDescriptor> configs = new HashMap<String, ConfigDescriptor>();
    protected Map<String, MQManager> managers = new HashMap<String, MQManager>();
    protected Map<String, ComputationManager> computationManagers = new HashMap<String, ComputationManager>();
    protected Map<String, TopologyDescriptor> topologies = new HashMap<String, TopologyDescriptor>();

    public int getApplicationStartedOrder() {
        return -520;
    }

    @Override
    public MQManager getManager(String name) {
        if (!this.managers.containsKey(name)) {
            if (!this.configs.containsKey(name)) {
                throw new IllegalArgumentException("Unknown MQ configuration: " + name);
            }
            ConfigDescriptor config = this.configs.get(name);
            if ("kafka".equalsIgnoreCase(config.getType())) {
                this.managers.put(name, this.createKafkaMQManager(config));
            } else {
                this.managers.put(name, this.createChronicleMQManager(config));
            }
        }
        return this.managers.get(name);
    }

    protected MQManager createKafkaMQManager(ConfigDescriptor config) {
        String kafkaConfig = config.getOption(MQ_CONFIG_XP, "default");
        KafkaConfigService service = (KafkaConfigService)Framework.getService(KafkaConfigService.class);
        return new KafkaMQManager(service.getZkServers(kafkaConfig), service.getTopicPrefix(kafkaConfig), service.getProducerProperties(kafkaConfig), service.getConsumerProperties(kafkaConfig));
    }

    protected MQManager createChronicleMQManager(ConfigDescriptor config) {
        String basePath = config.getOption("basePath", null);
        String directory = config.getOption("directory", config.getName());
        Path path = this.getChroniclePath(basePath, directory);
        String retention = this.getChronicleRetention(config.getOption("retention", null));
        return new ChronicleMQManager(path, retention);
    }

    protected String getChronicleRetention(String retention) {
        return retention != null ? retention : Framework.getProperty((String)NUXEO_MQUEUE_RET_DURATION_PROP, (String)"4d");
    }

    protected Path getChroniclePath(String basePath, String name) {
        if (basePath != null) {
            return Paths.get(basePath, name).toAbsolutePath();
        }
        basePath = Framework.getProperty((String)NUXEO_MQUEUE_DIR_PROP);
        if (basePath != null) {
            return Paths.get(basePath, name).toAbsolutePath();
        }
        basePath = Framework.getProperty((String)"nuxeo.data.dir");
        if (basePath != null) {
            return Paths.get(basePath, "mqueue", name).toAbsolutePath();
        }
        return Paths.get(Framework.getRuntime().getHome().getAbsolutePath(), "data", "mqueue", name).toAbsolutePath();
    }

    protected void createMQueueIfNotExists(String name, ConfigDescriptor config) {
        if (config.getMQueuesToCreate().isEmpty()) {
            return;
        }
        MQManager manager = this.getManager(name);
        config.getMQueuesToCreate().forEach((mqName, size) -> {
            log.info((Object)("Create if not exists MQ: " + mqName + " with manager: " + name));
            manager.createIfNotExists(mqName, size.intValue());
        });
    }

    public void start(ComponentContext context) {
        super.start(context);
        this.configs.forEach(this::createMQueueIfNotExists);
        this.topologies.forEach(this::initComputations);
        Framework.getRuntime().getComponentManager().addListener((ComponentManager.Listener)new ComponentsLifeCycleListener());
    }

    protected void initComputations(String name, TopologyDescriptor descriptor) {
        Topology topology;
        if (this.computationManagers.containsKey(name)) {
            log.error((Object)("Computation topology already initialized: " + name));
            return;
        }
        log.warn((Object)("Init computation topology: " + name + " with manager: " + descriptor.config));
        MQManager manager = this.getManager(descriptor.config);
        try {
            topology = descriptor.klass.newInstance().getTopology(descriptor.options);
        }
        catch (IllegalAccessException | InstantiationException e) {
            log.error((Object)("Can not create topology for " + name), (Throwable)e);
            return;
        }
        MQComputationManager computationManager = new MQComputationManager(manager);
        Settings settings = new Settings(descriptor.defaultConcurrency.intValue(), descriptor.defaultPartitions.intValue());
        descriptor.computations.forEach(comp -> settings.setConcurrency(comp.name, comp.concurrency.intValue()));
        descriptor.streams.forEach(stream -> settings.setPartitions(stream.name, stream.partitions.intValue()));
        if (log.isDebugEnabled()) {
            log.debug((Object)("Starting computation topology: " + name + "\n" + topology.toPlantuml(settings)));
        }
        computationManager.init(topology, settings);
        this.computationManagers.put(name, (ComputationManager)computationManager);
    }

    public void stop(ComponentContext context) throws InterruptedException {
        super.stop(context);
        this.stopComputations();
        this.closeManagers();
    }

    protected void startComputations() {
        this.topologies.keySet().forEach(name -> {
            ComputationManager manager = this.computationManagers.get(name);
            if (manager != null) {
                manager.start();
            }
        });
    }

    protected void stopComputations() {
        this.computationManagers.forEach((name, manager) -> manager.stop(Duration.ofSeconds(1L)));
        this.computationManagers.clear();
    }

    protected void closeManagers() {
        this.managers.forEach((name, manager) -> {
            try {
                manager.close();
            }
            catch (Exception e) {
                log.warn((Object)("Failed to close MQManager: " + name), (Throwable)e);
            }
        });
        this.managers.clear();
    }

    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
        if (extensionPoint.equals(MQ_CONFIG_XP)) {
            ConfigDescriptor descriptor = (ConfigDescriptor)contribution;
            this.configs.put(descriptor.name, descriptor);
            log.info((Object)String.format("Register MQ Config: %s", descriptor.name));
        } else if (extensionPoint.equals(MQ_TOPOLOGY_XP)) {
            TopologyDescriptor descriptor = (TopologyDescriptor)contribution;
            this.topologies.put(descriptor.name, descriptor);
            log.info((Object)String.format("Register MQ Topology: %s", descriptor.name));
        }
    }

    protected class ComponentsLifeCycleListener
    extends ComponentManager.LifeCycleHandler {
        protected ComponentsLifeCycleListener() {
        }

        public void afterStart(ComponentManager mgr, boolean isResume) {
            MQServiceImpl.this.startComputations();
        }

        public void beforeStop(ComponentManager mgr, boolean isStandby) {
            MQServiceImpl.this.stopComputations();
        }
    }
}

