/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.core.mqueues.mqueues.chronicle;

import java.io.Externalizable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.text.ParseException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Stream;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.CommonStore;
import net.openhft.chronicle.queue.impl.StoreFileListener;
import net.openhft.chronicle.queue.impl.WireStore;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.core.mqueues.mqueues.MQAppender;
import org.nuxeo.lib.core.mqueues.mqueues.MQOffset;
import org.nuxeo.lib.core.mqueues.mqueues.MQPartition;
import org.nuxeo.lib.core.mqueues.mqueues.MQTailer;
import org.nuxeo.lib.core.mqueues.mqueues.chronicle.ChronicleMQOffsetTracker;
import org.nuxeo.lib.core.mqueues.mqueues.chronicle.ChronicleMQTailer;
import org.nuxeo.lib.core.mqueues.mqueues.internals.MQOffsetImpl;

public class ChronicleMQAppender<M extends Externalizable>
implements MQAppender<M>,
StoreFileListener {
    private static final Log log = LogFactory.getLog(ChronicleMQAppender.class);
    protected static final String QUEUE_PREFIX = "Q-";
    protected static final int POLL_INTERVAL_MS = 100;
    protected static final String SECOND_ROLLING_PERIOD = "s";
    protected static final String MINUTE_ROLLING_PERIOD = "m";
    protected static final String HOUR_ROLLING_PERIOD = "h";
    protected static final String DAY_ROLLING_PERIOD = "d";
    protected final List<ChronicleQueue> queues;
    protected final int nbQueues;
    protected final File basePath;
    protected final String name;
    protected int retentionNbCycles;
    protected final ConcurrentLinkedQueue<ChronicleMQTailer<M>> tailers = new ConcurrentLinkedQueue();
    protected boolean closed = false;

    public static boolean exists(File basePath) {
        return basePath.isDirectory() && basePath.list().length > 0;
    }

    public String getBasePath() {
        return this.basePath.getPath();
    }

    public static <M extends Externalizable> ChronicleMQAppender<M> create(File basePath, int size, String retentionPolicy) {
        return new ChronicleMQAppender<M>(basePath, size, retentionPolicy);
    }

    public static <M extends Externalizable> ChronicleMQAppender<M> create(File basePath, int size) {
        return new ChronicleMQAppender<M>(basePath, size, "4d");
    }

    public static <M extends Externalizable> ChronicleMQAppender<M> open(File basePath) {
        return new ChronicleMQAppender<M>(basePath, 0, "4d");
    }

    public static <M extends Externalizable> ChronicleMQAppender<M> open(File basePath, String retentionDuration) {
        return new ChronicleMQAppender<M>(basePath, 0, retentionDuration);
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public int size() {
        return this.nbQueues;
    }

    @Override
    public MQOffset append(int partition, M message) {
        ExcerptAppender appender = this.queues.get(partition).acquireAppender();
        appender.writeDocument(w -> w.write((CharSequence)"msg").object(message));
        long offset = appender.lastIndexAppended();
        MQOffsetImpl ret = new MQOffsetImpl(this.name, partition, offset);
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("append to %s, value: %s", ret, message));
        }
        return ret;
    }

    public MQTailer<M> createTailer(MQPartition partition, String group) {
        return this.addTailer(new ChronicleMQTailer(this.basePath.toString(), this.queues.get(partition.partition()).createTailer(), partition, group));
    }

    public long endOffset(int partition) {
        return this.queues.get(partition).createTailer().toEnd().index();
    }

    public long firstOffset(int partition) {
        long ret = this.queues.get(partition).firstIndex();
        if (ret == Long.MAX_VALUE) {
            return 0L;
        }
        return ret;
    }

    public long countMessages(int partition, long lowerOffset, long upperOffset) {
        long ret;
        SingleChronicleQueue queue = (SingleChronicleQueue)this.queues.get(partition);
        try {
            ret = queue.countExcerpts(lowerOffset, upperOffset);
        }
        catch (IllegalStateException e) {
            return 0L;
        }
        return ret;
    }

    protected MQTailer<M> addTailer(ChronicleMQTailer<M> tailer) {
        this.tailers.add(tailer);
        return tailer;
    }

    @Override
    public boolean waitFor(MQOffset offset, String group, Duration timeout) throws InterruptedException {
        boolean ret;
        long offsetPosition = offset.offset();
        int partition = offset.partition().partition();
        try (ChronicleMQOffsetTracker offsetTracker = new ChronicleMQOffsetTracker(this.basePath.toString(), partition, group);){
            ret = this.isProcessed(offsetTracker, offsetPosition);
            if (ret) {
                boolean bl = true;
                return bl;
            }
            long timeoutMs = timeout.toMillis();
            long deadline = System.currentTimeMillis() + timeoutMs;
            long delay = Math.min(100L, timeoutMs);
            while (!ret && System.currentTimeMillis() < deadline) {
                Thread.sleep(delay);
                ret = this.isProcessed(offsetTracker, offsetPosition);
            }
        }
        return ret;
    }

    @Override
    public boolean closed() {
        return this.closed;
    }

    protected boolean isProcessed(ChronicleMQOffsetTracker tracker, long offset) {
        long last = tracker.readLastCommittedOffset();
        return last > 0L && last >= offset;
    }

    @Override
    public void close() {
        log.debug((Object)"Closing queue");
        this.tailers.stream().filter(Objects::nonNull).forEach(tailer -> {
            try {
                tailer.close();
            }
            catch (Exception e) {
                log.error((Object)("Failed to close tailer: " + tailer));
            }
        });
        this.tailers.clear();
        this.queues.stream().filter(Objects::nonNull).forEach(Closeable::close);
        this.queues.clear();
        this.closed = true;
    }

    protected ChronicleMQAppender(File basePath, int size, String retentionDuration) {
        if (size == 0) {
            if (!ChronicleMQAppender.exists(basePath)) {
                String msg = "Can not open Chronicle Queues, invalid path: " + basePath;
                log.error((Object)msg);
                throw new IllegalArgumentException(msg);
            }
            this.nbQueues = this.findNbQueues(basePath);
        } else {
            if (ChronicleMQAppender.exists(basePath)) {
                String msg = "Can not create Chronicle Queues, already exists: " + basePath;
                log.error((Object)msg);
                throw new IllegalArgumentException(msg);
            }
            if (!basePath.exists() && !basePath.mkdirs()) {
                String msg = "Can not create Chronicle Queues in: " + basePath;
                log.error((Object)msg);
                throw new IllegalArgumentException(msg);
            }
            this.nbQueues = size;
        }
        this.name = basePath.getName();
        this.basePath = basePath;
        if (retentionDuration != null) {
            this.retentionNbCycles = Integer.valueOf(retentionDuration.substring(0, retentionDuration.length() - 1));
        }
        RollCycle rollCycle = this.getRollCycle(retentionDuration);
        this.queues = new ArrayList<ChronicleQueue>(this.nbQueues);
        log.debug((Object)String.format("%s chronicle mqueue: %s, path: %s, size: %d", size == 0 ? "Opening" : "Creating", this.name, basePath, this.nbQueues));
        for (int i = 0; i < this.nbQueues; ++i) {
            File path = new File(basePath, String.format("%s%02d", QUEUE_PREFIX, i));
            SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary((File)path).rollCycle(rollCycle).storeFileListener((StoreFileListener)this).build();
            this.queues.add((ChronicleQueue)queue);
            queue.file().mkdirs();
        }
    }

    protected int findNbQueues(File basePath) {
        int ret;
        try (Stream<Path> paths = Files.list(basePath.toPath());){
            ret = (int)paths.filter(path -> Files.isDirectory(path, new LinkOption[0]) && path.getFileName().toString().startsWith(QUEUE_PREFIX)).count();
            if (ret == 0) {
                throw new IOException("No chronicles queues file found");
            }
        }
        catch (IOException e) {
            throw new IllegalArgumentException("Invalid basePath for queue: " + basePath, e);
        }
        return ret;
    }

    protected RollCycle getRollCycle(String retentionDuration) {
        RollCycles rollCycle;
        String rollingPeriod;
        switch (rollingPeriod = retentionDuration.substring(retentionDuration.length() - 1)) {
            case "s": {
                rollCycle = RollCycles.TEST_SECONDLY;
                break;
            }
            case "m": {
                rollCycle = RollCycles.MINUTELY;
                break;
            }
            case "h": {
                rollCycle = RollCycles.HOURLY;
                break;
            }
            case "d": {
                rollCycle = RollCycles.DAILY;
                break;
            }
            default: {
                String msg = "Unknown rolling period: " + rollingPeriod + " for MQueue: " + this.name();
                log.error((Object)msg);
                throw new IllegalArgumentException(msg);
            }
        }
        return rollCycle;
    }

    protected int findQueueIndex(File queueFile) {
        String queueDirName = queueFile.getParentFile().getName();
        return Integer.valueOf(queueDirName.substring(queueDirName.length() - 2));
    }

    public void onAcquired(int cycle, File file) {
        if (log.isDebugEnabled()) {
            log.debug((Object)("New file created: " + file + " on cycle: " + cycle));
        }
        SingleChronicleQueue queue = (SingleChronicleQueue)this.queues.get(this.findQueueIndex(file));
        this.purgeOldCyclesIfNeeded(queue);
    }

    protected synchronized void purgeOldCyclesIfNeeded(SingleChronicleQueue queue) {
        ArrayList cycles = new ArrayList();
        try {
            NavigableSet cyclesBetween = queue.listCyclesBetween(queue.firstCycle(), queue.lastCycle());
            cyclesBetween.iterator().forEachRemaining(cycles::add);
        }
        catch (ParseException e) {
            throw new RuntimeException("Fail to list cycles for queue: " + queue, e);
        }
        if (cycles.size() <= this.retentionNbCycles) {
            return;
        }
        for (Long cycleLong : cycles.subList(0, cycles.size() - this.retentionNbCycles)) {
            int cycle = cycleLong.intValue();
            WireStore store = queue.storeForCycle(cycle, queue.epoch(), false);
            if (store == null) continue;
            File file = store.file();
            queue.release((CommonStore)store);
            log.info((Object)("Deleting Chronicle file according to retention: " + file.getAbsolutePath()));
            file.delete();
        }
        queue.createTailer();
    }

    public void onReleased(int cycle, File file) {
    }
}

