/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka.KafkaUtils;
import org.nuxeo.ecm.platform.importer.mqueues.pattern.Message;
import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.ConsumerFactory;
import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.ConsumerPolicy;
import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.ConsumerStatus;
import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.internals.AbstractCallablePool;
import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.internals.ConsumerRunner;

public class ConsumerPool<M extends Message>
extends AbstractCallablePool<ConsumerStatus> {
    private static final Log log = LogFactory.getLog(ConsumerPool.class);
    private final MQManager<M> manager;
    private final ConsumerFactory<M> factory;
    private final ConsumerPolicy policy;
    private final String mqName;
    private final List<List<MQPartition>> defaultAssignments;

    public ConsumerPool(String mqName, MQManager<M> manager, ConsumerFactory<M> factory, ConsumerPolicy policy) {
        super(ConsumerPool.computeNbThreads((short)manager.getAppender(mqName).size(), policy.getMaxThreads()));
        this.mqName = mqName;
        this.manager = manager;
        this.factory = factory;
        this.policy = policy;
        this.defaultAssignments = this.getDefaultAssignments();
        if (manager.supportSubscribe()) {
            log.info((Object)("Creating consumer pool using MQ subscribe on " + mqName));
        } else {
            log.info((Object)("Creating consumer pool using MQ assignments on " + mqName + ": " + this.defaultAssignments));
        }
    }

    protected static short computeNbThreads(short maxConcurrency, short maxThreads) {
        if (maxThreads > 0) {
            return (short)Math.min(maxConcurrency, maxThreads);
        }
        return maxConcurrency;
    }

    public String getConsumerGroupName() {
        return this.policy.getName();
    }

    @Override
    protected ConsumerStatus getErrorStatus() {
        return new ConsumerStatus("error", 0L, 0L, 0L, 0L, 0L, 0L, true);
    }

    @Override
    protected Callable<ConsumerStatus> getCallable(int i) {
        return new ConsumerRunner<M>(this.factory, this.policy, this.manager, this.defaultAssignments.get(i));
    }

    @Override
    protected String getThreadPrefix() {
        return "Nuxeo-Consumer";
    }

    @Override
    protected void afterCall(List<ConsumerStatus> ret) {
        ret.forEach(arg_0 -> ((Log)log).info(arg_0));
        log.warn((Object)ConsumerStatus.toString(ret));
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    private List<List<MQPartition>> getDefaultAssignments() {
        Map<String, Integer> streams = Collections.singletonMap(this.mqName, this.manager.getAppender(this.mqName).size());
        return KafkaUtils.roundRobinAssignments(this.getNbThreads(), streams);
    }
}

