/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.platform.importer.base;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.javasimon.SimonManager;
import org.javasimon.Stopwatch;
import org.nuxeo.common.utils.ExceptionUtils;
import org.nuxeo.ecm.core.api.CloseableCoreSession;
import org.nuxeo.ecm.core.api.CoreInstance;
import org.nuxeo.ecm.core.api.CoreSession;
import org.nuxeo.ecm.core.api.DocumentModel;
import org.nuxeo.ecm.core.api.DocumentNotFoundException;
import org.nuxeo.ecm.core.api.DocumentRef;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.api.PathRef;
import org.nuxeo.ecm.platform.importer.base.GenericThreadedImportTask;
import org.nuxeo.ecm.platform.importer.base.ImporterRunner;
import org.nuxeo.ecm.platform.importer.base.ImporterRunnerConfiguration;
import org.nuxeo.ecm.platform.importer.factories.DefaultDocumentModelFactory;
import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory;
import org.nuxeo.ecm.platform.importer.filter.ImporterFilter;
import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter;
import org.nuxeo.ecm.platform.importer.listener.ImporterListener;
import org.nuxeo.ecm.platform.importer.listener.JobHistoryListener;
import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
import org.nuxeo.ecm.platform.importer.log.PerfLogger;
import org.nuxeo.ecm.platform.importer.source.SourceNode;
import org.nuxeo.ecm.platform.importer.threading.DefaultMultiThreadingPolicy;
import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy;
import org.nuxeo.runtime.transaction.TransactionHelper;

public class GenericMultiThreadedImporter
implements ImporterRunner {
    protected static ThreadPoolExecutor importTP;
    protected static Map<String, Long> nbCreatedDocsByThreads;
    protected ImporterThreadingPolicy threadPolicy;
    protected ImporterDocumentModelFactory factory;
    protected SourceNode importSource;
    protected DocumentModel targetContainer;
    protected Integer batchSize = 50;
    protected Integer nbThreads = 5;
    protected Integer transactionTimeout = 0;
    protected ImporterLogger log;
    protected CoreSession session;
    protected String importWritePath;
    protected Boolean skipRootContainerCreation = false;
    protected String jobName;
    protected boolean enablePerfLogging = true;
    protected List<ImporterFilter> filters = new ArrayList<ImporterFilter>();
    protected List<ImporterListener> listeners = new ArrayList<ImporterListener>();
    protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>();
    protected GenericThreadedImportTask rootImportTask;
    protected static final int DEFAULT_QUEUE_SIZE = 10000;
    protected int queueSize = 10000;
    protected String repositoryName;
    protected static final String[] PERF_HEADERS;

    public static ThreadPoolExecutor getExecutor() {
        return importTP;
    }

    public static synchronized void addCreatedDoc(String taskId, long nbDocs) {
        String tid = Thread.currentThread().getName();
        nbCreatedDocsByThreads.put(tid + "-" + taskId, nbDocs);
    }

    public static synchronized long getCreatedDocsCounter() {
        long counter = 0L;
        for (String tid : nbCreatedDocsByThreads.keySet()) {
            Long tCounter = nbCreatedDocsByThreads.get(tid);
            if (tCounter == null) continue;
            counter += tCounter.longValue();
        }
        return counter;
    }

    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log, int queueSize) {
        this.importSource = sourceNode;
        this.importWritePath = importWritePath;
        this.log = log;
        if (batchSize != null) {
            this.batchSize = batchSize;
        }
        if (nbThreads != null) {
            this.nbThreads = nbThreads;
        }
        if (skipRootContainerCreation != null) {
            this.skipRootContainerCreation = skipRootContainerCreation;
        }
    }

    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log) {
        this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log, 10000);
    }

    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize, Integer nbThreads, ImporterLogger log) {
        this(sourceNode, importWritePath, false, batchSize, nbThreads, log);
    }

    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, String jobName, ImporterLogger log) {
        this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log);
        this.jobName = jobName;
        if (jobName != null) {
            this.listeners.add(new JobHistoryListener(jobName));
        }
    }

    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize, Integer nbThreads, String jobName, ImporterLogger log) {
        this(sourceNode, importWritePath, (Boolean)false, batchSize, nbThreads, jobName, log);
    }

    public GenericMultiThreadedImporter(ImporterRunnerConfiguration configuration) {
        this(configuration.sourceNode, configuration.importWritePath, (Boolean)configuration.skipRootContainerCreation, (Integer)configuration.batchSize, (Integer)configuration.nbThreads, configuration.jobName, configuration.log);
        this.repositoryName = configuration.repositoryName;
    }

    public void addFilter(ImporterFilter filter) {
        this.log.debug(String.format("Filter with %s, was added on the importer with the hash code %s. The source node name is %s", filter.toString(), this.hashCode(), this.importSource.getName()));
        this.filters.add(filter);
    }

    public void addListeners(ImporterListener ... listeners) {
        this.addListeners(Arrays.asList(listeners));
    }

    public void addListeners(Collection<ImporterListener> listeners) {
        this.listeners.addAll(listeners);
    }

    public void addImportingDocumentFilters(ImportingDocumentFilter ... importingDocumentFilters) {
        this.addImportingDocumentFilters(Arrays.asList(importingDocumentFilters));
    }

    public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) {
        this.importingDocumentFilters.addAll(importingDocumentFilters);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        block23: {
            Exception finalException = null;
            if (!TransactionHelper.isTransactionActive()) {
                TransactionHelper.startTransaction();
            }
            try (CloseableCoreSession closeableCoreSession = CoreInstance.openCoreSessionSystem((String)this.repositoryName);){
                this.session = closeableCoreSession;
                for (ImporterFilter filter : this.filters) {
                    this.log.debug(String.format("Running filter with %s, on the importer with the hash code %s. The source node name is %s", filter.toString(), this.hashCode(), this.importSource.getName()));
                    filter.handleBeforeImport();
                }
                if (this.filters.size() == 0) {
                    this.log.debug(String.format("No filters are registered on the importer with hash code %s, while importing the source node with name %s", this.hashCode(), this.importSource.getName()));
                }
                this.doRun();
            }
            catch (Exception e) {
                ExceptionUtils.checkInterrupt((Exception)e);
                this.log.error("Task exec failed", e);
                finalException = e;
                break block23;
            }
            finally {
                for (ImporterFilter filter : this.filters) {
                    filter.handleAfterImport(finalException);
                }
                this.session = null;
            }
            for (ImporterFilter filter : this.filters) {
                filter.handleAfterImport(finalException);
            }
            this.session = null;
        }
    }

    public void setRootImportTask(GenericThreadedImportTask rootImportTask) {
        this.rootImportTask = rootImportTask;
    }

    protected GenericThreadedImportTask initRootTask(SourceNode importSource, DocumentModel targetContainer, boolean skipRootContainerCreation, ImporterLogger log, Integer batchSize, String jobName) {
        if (this.rootImportTask == null) {
            this.setRootImportTask(new GenericThreadedImportTask(this.repositoryName, importSource, targetContainer, skipRootContainerCreation, log, batchSize, this.getFactory(), this.getThreadPolicy(), jobName));
        } else {
            this.rootImportTask.setInputSource(importSource);
            this.rootImportTask.setTargetFolder(targetContainer);
            this.rootImportTask.setSkipContainerCreation(skipRootContainerCreation);
            this.rootImportTask.setRsLogger(log);
            this.rootImportTask.setFactory(this.getFactory());
            this.rootImportTask.setThreadPolicy(this.getThreadPolicy());
            this.rootImportTask.setJobName(jobName);
            this.rootImportTask.setBatchSize(batchSize);
        }
        this.rootImportTask.addListeners(this.listeners);
        this.rootImportTask.addImportingDocumentFilters(this.importingDocumentFilters);
        this.rootImportTask.setTransactionTimeout(this.transactionTimeout);
        return this.rootImportTask;
    }

    protected void doRun() throws IOException {
        PerfLogger perfLogger;
        this.targetContainer = this.getTargetContainer();
        nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>();
        importTP = new ThreadPoolExecutor((int)this.nbThreads, (int)this.nbThreads, 500L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(this.queueSize), new NamedThreadFactory("Nuxeo-Importer-"));
        this.initRootTask(this.importSource, this.targetContainer, this.skipRootContainerCreation, this.log, this.batchSize, this.jobName);
        this.rootImportTask.setRootTask();
        long t0 = System.currentTimeMillis();
        this.notifyBeforeImport();
        importTP.execute(this.rootImportTask);
        GenericMultiThreadedImporter.sleep(200);
        int activeTasks = importTP.getActiveCount();
        int oldActiveTasks = 0;
        long lastLogProgressTime = System.currentTimeMillis();
        long lastCreatedDocCounter = 0L;
        PerfLogger perfLogger2 = perfLogger = this.enablePerfLogging ? new PerfLogger(PERF_HEADERS) : null;
        while (activeTasks > 0) {
            double averageSpeed;
            long ti;
            GenericMultiThreadedImporter.sleep(500);
            activeTasks = importTP.getActiveCount();
            boolean logProgress = false;
            if (oldActiveTasks != activeTasks) {
                oldActiveTasks = activeTasks;
                this.log.debug("currently " + activeTasks + " active import Threads");
                logProgress = true;
            }
            if ((ti = System.currentTimeMillis()) - lastLogProgressTime > 5000L) {
                logProgress = true;
            }
            if (!logProgress) continue;
            long inbCreatedDocs = GenericMultiThreadedImporter.getCreatedDocsCounter();
            long deltaT = ti - lastLogProgressTime;
            double imediateSpeed = averageSpeed = (double)(1000.0f * ((float)inbCreatedDocs / (float)(ti - t0)));
            if (deltaT > 0L) {
                imediateSpeed = 1000.0f * ((float)(inbCreatedDocs - lastCreatedDocCounter) / (float)deltaT);
            }
            this.log.info(inbCreatedDocs + " docs created");
            this.log.info("average speed = " + averageSpeed + " docs/s");
            this.log.info("immediate speed = " + imediateSpeed + " docs/s");
            if (this.enablePerfLogging) {
                Double[] perfData = new Double[]{new Double(inbCreatedDocs), averageSpeed, imediateSpeed};
                perfLogger.log(perfData);
            }
            lastLogProgressTime = ti;
            lastCreatedDocCounter = inbCreatedDocs;
        }
        this.stopImportProcrocess();
        this.log.info("All Threads terminated");
        if (this.enablePerfLogging) {
            perfLogger.release();
        }
        this.notifyAfterImport();
        long t1 = System.currentTimeMillis();
        long nbCreatedDocs = GenericMultiThreadedImporter.getCreatedDocsCounter();
        this.log.info(nbCreatedDocs + " docs created");
        this.log.info(1000.0f * ((float)nbCreatedDocs / (float)(t1 - t0)) + " docs/s");
        for (String k : nbCreatedDocsByThreads.keySet()) {
            this.log.info(k + " --> " + nbCreatedDocsByThreads.get(k));
        }
        for (String name : SimonManager.simonNames()) {
            Stopwatch stopwatch;
            if (name == null || name.isEmpty() || !name.startsWith("org.nuxeo.ecm.platform.importer") || (stopwatch = SimonManager.getStopwatch((String)name)).getCounter() <= 0L) continue;
            this.log.info(stopwatch.toString());
        }
    }

    protected static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new NuxeoException((Throwable)e);
        }
    }

    protected DocumentModel getTargetContainer() {
        if (this.targetContainer == null) {
            this.targetContainer = this.createTargetContainer();
        }
        return this.targetContainer;
    }

    protected DocumentModel createTargetContainer() {
        try {
            return this.session.getDocument((DocumentRef)new PathRef(this.importWritePath));
        }
        catch (DocumentNotFoundException e) {
            this.log.error(e.getMessage());
            throw e;
        }
    }

    public ImporterThreadingPolicy getThreadPolicy() {
        if (this.threadPolicy == null) {
            this.threadPolicy = new DefaultMultiThreadingPolicy();
        }
        return this.threadPolicy;
    }

    public void setThreadPolicy(ImporterThreadingPolicy threadPolicy) {
        this.threadPolicy = threadPolicy;
    }

    public ImporterDocumentModelFactory getFactory() {
        if (this.factory == null) {
            this.factory = new DefaultDocumentModelFactory();
        }
        return this.factory;
    }

    public void setFactory(ImporterDocumentModelFactory factory) {
        this.factory = factory;
    }

    public void setTransactionTimeout(int transactionTimeout) {
        this.transactionTimeout = transactionTimeout;
    }

    public void setEnablePerfLogging(boolean enablePerfLogging) {
        this.enablePerfLogging = enablePerfLogging;
    }

    @Override
    public void stopImportProcrocess() {
        if (importTP != null && !importTP.isTerminated() && !importTP.isTerminating()) {
            importTP.shutdownNow();
        }
    }

    protected void notifyBeforeImport() {
        for (ImporterListener listener : this.listeners) {
            listener.beforeImport();
        }
    }

    protected void notifyAfterImport() {
        for (ImporterListener listener : this.listeners) {
            listener.afterImport();
        }
    }

    public String getRepositoryName() {
        return this.repositoryName;
    }

    public void setRepositoryName(String repositoryName) {
        this.repositoryName = repositoryName;
    }

    static {
        nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>();
        PERF_HEADERS = new String[]{"nbDocs", "average", "imediate"};
    }

    public static class NamedThreadFactory
    implements ThreadFactory {
        private final AtomicInteger threadNumber = new AtomicInteger();
        private final ThreadGroup group;
        private final String prefix;

        public NamedThreadFactory(String prefix) {
            SecurityManager sm = System.getSecurityManager();
            this.group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
            this.prefix = prefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            String name = this.prefix + this.threadNumber.incrementAndGet();
            Thread thread = new Thread(this.group, r, name);
            thread.setPriority(5);
            return thread;
        }
    }
}

