/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.internals;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public abstract class AbstractCallablePool<T>
implements AutoCloseable {
    private static final Log log = LogFactory.getLog(AbstractCallablePool.class);
    private final short nbThreads;
    private ExecutorService threadPool;

    public AbstractCallablePool(short nbThreads) {
        this.nbThreads = nbThreads;
    }

    protected abstract T getErrorStatus();

    protected abstract Callable<T> getCallable(int var1);

    protected abstract String getThreadPrefix();

    protected abstract void afterCall(List<T> var1);

    public int getNbThreads() {
        return this.nbThreads;
    }

    public CompletableFuture<List<T>> start() {
        ExecutorService supplyThreadPool = Executors.newSingleThreadExecutor(new NamedThreadFactory(this.getThreadPrefix() + "Pool"));
        CompletableFuture<List<T>> ret = new CompletableFuture<List<T>>();
        CompletableFuture.supplyAsync(() -> {
            try {
                ret.complete(this.runPool());
            }
            catch (Throwable t) {
                ret.completeExceptionally(t);
            }
            return ret;
        }, supplyThreadPool);
        supplyThreadPool.shutdown();
        return ret;
    }

    protected List<T> runPool() throws InterruptedException {
        this.threadPool = Executors.newFixedThreadPool(this.nbThreads, new NamedThreadFactory(this.getThreadPrefix()));
        log.warn((Object)("Start " + this.getThreadPrefix() + " Pool on " + this.nbThreads + " thread(s)."));
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>(this.nbThreads);
        for (int i = 0; i < this.nbThreads; ++i) {
            Callable callable = this.getCallable(i);
            CompletableFuture future = new CompletableFuture();
            CompletableFuture.supplyAsync(() -> {
                try {
                    future.complete(callable.call());
                }
                catch (Throwable t) {
                    log.error((Object)("Exception catch in runner: " + t.getMessage()), t);
                    future.completeExceptionally(t);
                }
                return future;
            }, this.threadPool);
            futures.add(future);
        }
        log.info((Object)"Pool is up and running");
        this.threadPool.shutdown();
        ArrayList ret = new ArrayList(this.nbThreads);
        for (CompletableFuture future : futures) {
            Object status;
            try {
                status = future.get();
            }
            catch (ExecutionException e) {
                log.error((Object)("End of consumer in error: " + e.getMessage() + future.toString()));
                status = this.getErrorStatus();
            }
            ret.add(status);
        }
        this.afterCall(ret);
        return ret;
    }

    @Override
    public void close() throws Exception {
        this.threadPool.shutdownNow();
    }

    protected static class NamedThreadFactory
    implements ThreadFactory {
        private final AtomicInteger count = new AtomicInteger(0);
        private final String prefix;

        public NamedThreadFactory(String prefix) {
            this.prefix = prefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, String.format("%s-%02d", this.prefix, this.count.getAndIncrement()));
        }
    }
}

