/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.platform.importer.mqueues.workmanager;

import java.io.Externalizable;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.naming.NamingException;
import javax.transaction.RollbackException;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.work.WorkManagerImpl;
import org.nuxeo.ecm.core.work.WorkQueueRegistry;
import org.nuxeo.ecm.core.work.api.Work;
import org.nuxeo.ecm.core.work.api.WorkManager;
import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
import org.nuxeo.ecm.platform.importer.mqueues.computation.Record;
import org.nuxeo.ecm.platform.importer.mqueues.computation.Settings;
import org.nuxeo.ecm.platform.importer.mqueues.computation.Topology;
import org.nuxeo.ecm.platform.importer.mqueues.computation.mqueue.MQComputationManager;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager;
import org.nuxeo.ecm.platform.importer.mqueues.workmanager.ComputationWork;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.model.ComponentContext;
import org.nuxeo.runtime.transaction.TransactionHelper;

public abstract class WorkManagerComputation
extends WorkManagerImpl {
    protected static final Log log = LogFactory.getLog(WorkManagerComputation.class);
    protected static final int DEFAULT_CONCURRENCY = 4;
    protected Topology topology;
    protected Settings settings;
    protected MQComputationManager manager;
    protected MQManager<Record> mqManager;
    protected final Set<String> streamIds = new HashSet<String>();

    protected abstract MQManager<Record> initStream();

    protected abstract int getOverProvisioningFactor();

    public void schedule(Work work, WorkManager.Scheduling scheduling, boolean afterCommit) {
        String queueId = this.getStreamForCategory(work.getCategory());
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Scheduling: workId: %s, category: %s, queue: %s, scheduling: %s, afterCommit: %s, work: %s", work.getId(), work.getCategory(), queueId, scheduling, afterCommit, work));
        }
        if (!this.isQueuingEnabled(queueId)) {
            log.info((Object)("Queue disabled, scheduling canceled: " + queueId));
            return;
        }
        if (afterCommit && this.scheduleAfterCommit(work, scheduling)) {
            return;
        }
        WorkSchedulePath.newInstance((Work)work);
        String key = work.getId();
        MQAppender appender = this.mqManager.getAppender(this.getStreamForCategory(work.getCategory()));
        if (appender == null) {
            log.error((Object)String.format("Not scheduled work, unknown category: %s, mapped to %s", work.getCategory(), this.getStreamForCategory(work.getCategory())));
            return;
        }
        appender.append(key, (Externalizable)Record.of((String)key, (byte[])ComputationWork.serialize(work)));
    }

    public String getStreamForCategory(String category) {
        if (category != null && this.streamIds.contains(category)) {
            return category;
        }
        return "default";
    }

    public int getApplicationStartedOrder() {
        return -502;
    }

    public void start(ComponentContext context) {
        this.init();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void init() {
        if (this.started) {
            return;
        }
        log.debug((Object)"Initializing");
        WorkManagerComputation workManagerComputation = this;
        synchronized (workManagerComputation) {
            if (this.started) {
                return;
            }
            this.supplantWorkManagerImpl();
            this.initTopology();
            this.mqManager = this.initStream();
            this.startComputation();
            this.started = true;
            log.info((Object)"Initialized");
        }
    }

    protected void supplantWorkManagerImpl() {
        WorkQueueRegistry wqr;
        Field protectedField;
        WorkManagerImpl wmi = (WorkManagerImpl)Framework.getRuntime().getComponent("org.nuxeo.ecm.core.work.service");
        Class<WorkManagerImpl> clazz = WorkManagerImpl.class;
        try {
            protectedField = clazz.getDeclaredField("workQueueConfig");
        }
        catch (NoSuchFieldException e) {
            throw new RuntimeException(e);
        }
        protectedField.setAccessible(true);
        try {
            wqr = (WorkQueueRegistry)protectedField.get(wmi);
            log.debug((Object)"Remove contributions from WorkManagerImpl");
            protectedField.set(wmi, new WorkQueueRegistry());
        }
        catch (IllegalAccessException e) {
            throw new RuntimeException(e);
        }
        wqr.getQueueIds().forEach(id -> this.workQueueConfig.addContribution((Object)wqr.get(id)));
        this.streamIds.addAll(this.workQueueConfig.getQueueIds());
        this.workQueueConfig.getQueueIds().forEach(id -> log.info((Object)("Registering : " + id)));
    }

    protected void startComputation() {
        this.manager = new MQComputationManager(this.mqManager, this.topology, this.settings);
        this.manager.start();
    }

    protected void initTopology() {
        Topology.Builder builder = Topology.builder();
        this.workQueueConfig.getQueueIds().forEach(item -> builder.addComputation(() -> new ComputationWork((String)item), Collections.singletonList("i1:" + item)));
        this.topology = builder.build();
        this.settings = new Settings(4, this.getPartitions(4));
        this.workQueueConfig.getQueueIds().forEach(item -> this.settings.setConcurrency(item, this.workQueueConfig.get(item).getMaxThreads()));
        this.workQueueConfig.getQueueIds().forEach(item -> this.settings.setPartitions(item, this.getPartitions(this.workQueueConfig.get(item).getMaxThreads())));
    }

    protected int getPartitions(int maxThreads) {
        if (maxThreads == 1) {
            return 1;
        }
        return this.getOverProvisioningFactor() * maxThreads;
    }

    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException {
        log.info((Object)("Shutdown WorkManager stream: " + queueId));
        return false;
    }

    public boolean shutdown(long timeout, TimeUnit timeUnit) throws InterruptedException {
        log.info((Object)("Shutdown WorkManager in " + timeUnit.toMillis(timeout) + " ms"));
        boolean ret = this.manager.stop(Duration.ofMillis(timeUnit.toMillis(timeout)));
        try {
            this.mqManager.close();
        }
        catch (Exception e) {
            log.error((Object)"Error while closing WorkManager mqManager", (Throwable)e);
        }
        return ret;
    }

    public int getQueueSize(String queueId, Work.State state) {
        return 0;
    }

    public WorkQueueMetrics getMetrics(String queueId) {
        return new WorkQueueMetrics(queueId, (Number)0, (Number)0, (Number)0, (Number)0);
    }

    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
        if (!this.isStarted()) {
            return true;
        }
        long durationMs = Math.min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1L));
        long deadline = System.currentTimeMillis() + durationMs;
        long lowWatermark = this.getLowWaterMark(queueId);
        while (System.currentTimeMillis() < deadline) {
            Thread.sleep(100L);
            long wm = this.getLowWaterMark(queueId);
            if (wm == lowWatermark) {
                log.debug((Object)("awaitCompletion for " + (queueId == null ? "all" : queueId) + " completed " + wm));
                return true;
            }
            if (log.isDebugEnabled()) {
                log.debug((Object)("awaitCompletion low wm  for " + (queueId == null ? "all" : queueId) + ":" + wm + " diff: " + (wm - lowWatermark)));
            }
            lowWatermark = wm;
        }
        log.warn((Object)String.format("%s timeout after: %.2fs", queueId, (double)durationMs / 1000.0));
        return false;
    }

    private long getLowWaterMark(String queueId) {
        if (queueId != null) {
            return this.manager.getLowWatermark(queueId);
        }
        return this.manager.getLowWatermark();
    }

    public Work.State getWorkState(String s) {
        return null;
    }

    public Work find(String s, Work.State state) {
        return null;
    }

    public List<Work> listWork(String s, Work.State state) {
        return Collections.emptyList();
    }

    public List<String> listWorkIds(String s, Work.State state) {
        return Collections.emptyList();
    }

    protected boolean scheduleAfterCommit(Work work, WorkManager.Scheduling scheduling) {
        TransactionManager transactionManager;
        try {
            transactionManager = TransactionHelper.lookupTransactionManager();
        }
        catch (NamingException e) {
            transactionManager = null;
        }
        if (transactionManager == null) {
            log.warn((Object)("Not scheduled work after commit because of missing transaction manager: " + work.getId()));
            return false;
        }
        try {
            Transaction transaction = transactionManager.getTransaction();
            if (transaction == null) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Not scheduled work after commit because of missing transaction: " + work.getId()));
                }
                return false;
            }
            int status = transaction.getStatus();
            if (status == 0) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Scheduled after commit: " + work.getId()));
                }
                transaction.registerSynchronization((Synchronization)new WorkScheduling(work, scheduling));
                return true;
            }
            if (status == 3) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Scheduled immediately: " + work.getId()));
                }
                return false;
            }
            if (status == 1) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Cancelling schedule because transaction marked rollback-only: " + work.getId()));
                }
                return true;
            }
            if (log.isDebugEnabled()) {
                log.debug((Object)("Not scheduling work after commit because transaction is in status " + status + ": " + work.getId()));
            }
            return false;
        }
        catch (RollbackException | SystemException e) {
            log.error((Object)"Cannot schedule after commit", e);
            return false;
        }
    }

    public class WorkScheduling
    implements Synchronization {
        public final Work work;
        public final WorkManager.Scheduling scheduling;

        public WorkScheduling(Work work, WorkManager.Scheduling scheduling) {
            this.work = work;
            this.scheduling = scheduling;
        }

        public void beforeCompletion() {
        }

        public void afterCompletion(int status) {
            if (status == 3) {
                WorkManagerComputation.this.schedule(this.work, this.scheduling, false);
            } else if (status != 4) {
                throw new IllegalArgumentException("Unsupported transaction status " + status);
            }
        }
    }
}

