/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.stream.log.chronicle;

import java.io.Externalizable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Stream;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.impl.StoreFileListener;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.log.LogOffset;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.chronicle.ChronicleLogOffsetTracker;
import org.nuxeo.lib.stream.log.chronicle.ChronicleLogTailer;
import org.nuxeo.lib.stream.log.chronicle.ChronicleRetentionDuration;
import org.nuxeo.lib.stream.log.chronicle.ChronicleRetentionListener;
import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;

public class ChronicleLogAppender<M extends Externalizable>
implements CloseableLogAppender<M> {
    private static final Log log = LogFactory.getLog(ChronicleLogAppender.class);
    protected static final String PARTITION_PREFIX = "P-";
    protected static final int POLL_INTERVAL_MS = 100;
    protected static final int MAX_PARTITIONS = 100;
    protected final List<ChronicleQueue> partitions;
    protected final int nbPartitions;
    protected final File basePath;
    protected final String name;
    protected final ConcurrentLinkedQueue<ChronicleLogTailer<M>> tailers = new ConcurrentLinkedQueue();
    protected final ChronicleRetentionDuration retention;
    protected volatile boolean closed;

    protected ChronicleLogAppender(File basePath, int size, ChronicleRetentionDuration retention) {
        if (size == 0) {
            if (!ChronicleLogAppender.exists(basePath)) {
                throw new IllegalArgumentException("Cannot open Chronicle Queues, invalid path: " + basePath);
            }
            this.nbPartitions = this.findNbQueues(basePath);
        } else {
            if (size > 100) {
                throw new IllegalArgumentException(String.format("Cannot create more than: %d partitions for log: %s, requested: %d", 100, basePath, size));
            }
            if (ChronicleLogAppender.exists(basePath)) {
                throw new IllegalArgumentException("Cannot create Chronicle Queues, already exists: " + basePath);
            }
            if (!basePath.exists() && !basePath.mkdirs()) {
                throw new IllegalArgumentException("Invalid path to create Chronicle Queues: " + basePath);
            }
            this.nbPartitions = size;
        }
        this.name = basePath.getName();
        this.basePath = basePath;
        this.retention = retention;
        this.partitions = new ArrayList<ChronicleQueue>(this.nbPartitions);
        if (log.isDebugEnabled()) {
            log.debug((Object)((size == 0 ? "Opening: " : "Creating: ") + this.toString()));
        }
        this.initPartitions();
    }

    protected void initPartitions() {
        for (int i = 0; i < this.nbPartitions; ++i) {
            File path = new File(this.basePath, String.format("%s%02d", PARTITION_PREFIX, i));
            if (this.retention.disable()) {
                this.partitions.add((ChronicleQueue)SingleChronicleQueueBuilder.binary((File)path).build());
            } else {
                ChronicleRetentionListener listener = new ChronicleRetentionListener(this.retention);
                SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary((File)path).rollCycle(this.retention.getRollCycle()).storeFileListener((StoreFileListener)listener).build();
                listener.setQueue(queue);
                this.partitions.add((ChronicleQueue)queue);
            }
            try {
                Files.createDirectories(path.toPath(), new FileAttribute[0]);
                continue;
            }
            catch (IOException e) {
                throw new IllegalArgumentException("Cannot create directory: " + path.getAbsolutePath(), e);
            }
        }
    }

    protected static boolean exists(File basePath) {
        return basePath.isDirectory() && basePath.list().length > 0;
    }

    public static <M extends Externalizable> ChronicleLogAppender<M> create(File basePath, int size, ChronicleRetentionDuration retention) {
        return new ChronicleLogAppender<M>(basePath, size, retention);
    }

    public static <M extends Externalizable> ChronicleLogAppender<M> create(File basePath, int size) {
        return new ChronicleLogAppender<M>(basePath, size, ChronicleRetentionDuration.DISABLE);
    }

    public static <M extends Externalizable> ChronicleLogAppender<M> open(File basePath) {
        return new ChronicleLogAppender<M>(basePath, 0, ChronicleRetentionDuration.DISABLE);
    }

    public static <M extends Externalizable> ChronicleLogAppender<M> open(File basePath, ChronicleRetentionDuration retention) {
        return new ChronicleLogAppender<M>(basePath, 0, retention);
    }

    public String getBasePath() {
        return this.basePath.getPath();
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public int size() {
        return this.nbPartitions;
    }

    @Override
    public LogOffset append(int partition, M message) {
        ExcerptAppender appender = this.partitions.get(partition).acquireAppender();
        appender.writeDocument(w -> w.write((CharSequence)"msg").object(message));
        long offset = appender.lastIndexAppended();
        LogOffsetImpl ret = new LogOffsetImpl(this.name, partition, offset);
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("append to %s, value: %s", ret, message));
        }
        return ret;
    }

    public LogTailer<M> createTailer(LogPartition partition, String group) {
        return this.addTailer(new ChronicleLogTailer(this.basePath.toString(), this.partitions.get(partition.partition()).createTailer(), partition, group, this.retention));
    }

    public long endOffset(int partition) {
        return this.partitions.get(partition).createTailer().toEnd().index();
    }

    public long firstOffset(int partition) {
        long ret = this.partitions.get(partition).firstIndex();
        if (ret == Long.MAX_VALUE) {
            return 0L;
        }
        return ret;
    }

    public long countMessages(int partition, long lowerOffset, long upperOffset) {
        long ret;
        SingleChronicleQueue queue = (SingleChronicleQueue)this.partitions.get(partition);
        try {
            ret = queue.countExcerpts(lowerOffset, upperOffset);
        }
        catch (IllegalStateException e) {
            if (log.isDebugEnabled()) {
                log.debug((Object)("Missing low cycle file: " + lowerOffset + " for queue: " + queue + " " + e.getMessage()));
            }
            return 0L;
        }
        return ret;
    }

    protected LogTailer<M> addTailer(ChronicleLogTailer<M> tailer) {
        this.tailers.add(tailer);
        return tailer;
    }

    @Override
    public boolean waitFor(LogOffset offset, String group, Duration timeout) throws InterruptedException {
        boolean ret;
        long offsetPosition = offset.offset();
        int partition = offset.partition().partition();
        try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(this.basePath.toString(), partition, group);){
            ret = this.isProcessed(offsetTracker, offsetPosition);
            if (ret) {
                boolean bl = true;
                return bl;
            }
            long timeoutMs = timeout.toMillis();
            long deadline = System.currentTimeMillis() + timeoutMs;
            long delay = Math.min(100L, timeoutMs);
            while (!ret && System.currentTimeMillis() < deadline) {
                Thread.sleep(delay);
                ret = this.isProcessed(offsetTracker, offsetPosition);
            }
        }
        return ret;
    }

    @Override
    public boolean closed() {
        return this.closed;
    }

    protected boolean isProcessed(ChronicleLogOffsetTracker tracker, long offset) {
        long last = tracker.readLastCommittedOffset();
        return last > 0L && last >= offset;
    }

    @Override
    public void close() {
        log.debug((Object)("Closing: " + this.toString()));
        this.tailers.stream().filter(Objects::nonNull).forEach(ChronicleLogTailer::close);
        this.tailers.clear();
        this.partitions.stream().filter(Objects::nonNull).forEach(Closeable::close);
        this.partitions.clear();
        this.closed = true;
    }

    protected int findNbQueues(File basePath) {
        int ret;
        try (Stream<Path> paths = Files.list(basePath.toPath());){
            ret = (int)paths.filter(path -> Files.isDirectory(path, new LinkOption[0]) && path.getFileName().toString().startsWith(PARTITION_PREFIX)).count();
            if (ret == 0) {
                throw new IOException("No chronicles queues file found");
            }
        }
        catch (IOException e) {
            throw new IllegalArgumentException("Invalid basePath for queue: " + basePath, e);
        }
        return ret;
    }

    public String toString() {
        return "ChronicleLogAppender{nbPartitions=" + this.nbPartitions + ", basePath=" + this.basePath + ", name='" + this.name + '\'' + ", retention=" + this.retention + ", closed=" + this.closed + '}';
    }

    public ChronicleRetentionDuration getRetention() {
        return this.retention;
    }
}

