/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.tree;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.io.IORuntimeException;
import net.openhft.chronicle.core.threads.EventHandler;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.threads.HandlerPriority;
import net.openhft.chronicle.core.threads.InvalidEventHandlerException;
import net.openhft.chronicle.core.util.ObjectUtils;
import net.openhft.chronicle.engine.api.map.MapEvent;
import net.openhft.chronicle.engine.api.map.MapView;
import net.openhft.chronicle.engine.api.pubsub.Publisher;
import net.openhft.chronicle.engine.api.pubsub.Reference;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.pubsub.TopicPublisher;
import net.openhft.chronicle.engine.api.pubsub.TopicSubscriber;
import net.openhft.chronicle.engine.api.set.EntrySetView;
import net.openhft.chronicle.engine.api.set.KeySetView;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.AssetNotFoundException;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.fs.Clusters;
import net.openhft.chronicle.engine.fs.EngineCluster;
import net.openhft.chronicle.engine.fs.EngineHostDetails;
import net.openhft.chronicle.engine.map.VanillaKeyValueStore;
import net.openhft.chronicle.engine.map.VanillaMapView;
import net.openhft.chronicle.engine.pubsub.QueueTopicPublisher;
import net.openhft.chronicle.engine.query.Filter;
import net.openhft.chronicle.engine.query.QueueConfig;
import net.openhft.chronicle.engine.tree.HostIdentifier;
import net.openhft.chronicle.engine.tree.MessageAdaptor;
import net.openhft.chronicle.engine.tree.QueueView;
import net.openhft.chronicle.engine.tree.SubAssetFactory;
import net.openhft.chronicle.engine.tree.VanillaAsset;
import net.openhft.chronicle.engine.tree.VanillaSubAsset;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.impl.RollingChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.Marshallable;
import net.openhft.chronicle.wire.MarshallableOut;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.WireType;
import net.openhft.chronicle.wire.Wires;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChronicleQueueView<T, M>
implements QueueView<T, M>,
MapView<T, M>,
SubAssetFactory,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(ChronicleQueueView.class);
    @NotNull
    private final RollingChronicleQueue chronicleQueue;
    private final Class<T> messageTypeClass;
    @NotNull
    private final Class<M> elementTypeClass;
    private final ThreadLocal<ThreadLocalData> threadLocal;
    @NotNull
    private final String defaultPath;
    @NotNull
    private final RequestContext context;
    @NotNull
    private final Asset asset;
    private boolean isSource;
    private boolean isReplicating;
    private boolean dontPersist;
    @NotNull
    private QueueConfig queueConfig;
    private volatile MapView<T, M> mapView;

    public ChronicleQueueView(@NotNull RequestContext context, @NotNull Asset asset) throws IOException {
        this(null, context, asset);
    }

    public ChronicleQueueView(@Nullable RollingChronicleQueue queue, @NotNull RequestContext context, @NotNull Asset asset) throws IOException {
        this.context = context;
        this.asset = asset;
        String s = asset.fullName();
        if (s.startsWith("/")) {
            s = s.substring(1);
        }
        this.defaultPath = s;
        HostIdentifier hostIdentifier = asset.findOrCreateView(HostIdentifier.class);
        Byte hostId = hostIdentifier == null ? null : Byte.valueOf(hostIdentifier.hostId());
        this.queueConfig = asset.findView(QueueConfig.class);
        if (this.queueConfig == null) {
            throw new AssetNotFoundException("QueueConfig not found at " + asset);
        }
        this.chronicleQueue = queue != null ? queue : this.newInstance(context.basePath(), this.queueConfig.wireType());
        this.messageTypeClass = context.messageType();
        this.elementTypeClass = context.elementType();
        this.threadLocal = ThreadLocal.withInitial(() -> new ThreadLocalData((ChronicleQueue)this.chronicleQueue));
        this.dontPersist = context.dontPersist();
        if (hostId != null) {
            this.replication(context, asset);
        }
        EventLoop eventLoop = asset.findOrCreateView(EventLoop.class);
        eventLoop.addHandler(new EventHandler(){

            public boolean action() throws InvalidEventHandlerException, InterruptedException {
                ChronicleQueueView.this.chronicleQueue.acquireAppender().pretouch();
                return false;
            }

            @NotNull
            public HandlerPriority priority() {
                return HandlerPriority.MONITOR;
            }
        });
    }

    @NotNull
    public static WriteMarshallable newSource(long nextIndexRequired, @NotNull Class topicType, @NotNull Class elementType, boolean acknowledgement, @Nullable MessageAdaptor messageAdaptor) {
        Objects.requireNonNull(topicType);
        Objects.requireNonNull(elementType);
        try {
            Class<?> aClass = Class.forName("software.chronicle.enterprise.queue.QueueSourceReplicationHandler");
            Constructor<?> declaredConstructor = aClass.getDeclaredConstructor(Long.TYPE, Class.class, Class.class, Boolean.TYPE, MessageAdaptor.class);
            return (WriteMarshallable)declaredConstructor.newInstance(nextIndexRequired, topicType, elementType, acknowledgement, messageAdaptor);
        }
        catch (Exception e) {
            IllegalStateException licence = new IllegalStateException("A Chronicle Queue Enterprise licence is required to run chronicle-queue replication. Please contact sales@chronicle.software");
            Jvm.warn().on(ChronicleQueueView.class, licence.getMessage());
            throw licence;
        }
    }

    @NotNull
    public static WriteMarshallable newSync(@NotNull Class topicType, @NotNull Class elementType, boolean acknowledgement, @Nullable MessageAdaptor messageAdaptor, @NotNull WireType wireType) {
        try {
            Class<?> aClass = Class.forName("software.chronicle.enterprise.queue.QueueSyncReplicationHandler");
            Constructor<?> declaredConstructor = aClass.getConstructor(Class.class, Class.class, Boolean.TYPE, MessageAdaptor.class, WireType.class);
            return (WriteMarshallable)declaredConstructor.newInstance(topicType, elementType, acknowledgement, messageAdaptor, wireType);
        }
        catch (Exception e) {
            IllegalStateException licence = new IllegalStateException("A Chronicle Queue Enterprise licence is required to do chronicle-queue replication. Please contact sales@chronicle.software");
            Jvm.warn().on(ChronicleQueueView.class, licence.getMessage());
            throw licence;
        }
    }

    public static boolean isQueueReplicationAvailable() {
        try {
            Class.forName("software.chronicle.enterprise.queue.QueueSyncReplicationHandler");
            return true;
        }
        catch (ClassNotFoundException e) {
            return false;
        }
    }

    private static void deleteFiles(@NotNull File element) throws IOException {
        if (element.isDirectory()) {
            File[] files = element.listFiles();
            if (files == null) {
                return;
            }
            for (File sub : files) {
                ChronicleQueueView.deleteFiles(sub);
            }
        }
        try {
            Files.deleteIfExists(element.toPath());
        }
        catch (IOException e) {
            Jvm.debug().on(ChronicleQueueView.class, "Unable to delete " + element, (Throwable)e);
        }
    }

    @NotNull
    public static QueueView create(@NotNull RequestContext context, @NotNull Asset asset) {
        try {
            return new ChronicleQueueView(context, asset);
        }
        catch (IOException e) {
            throw Jvm.rethrow((Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public MapView<T, M> mapView() {
        MapView<T, M> mapView = this.mapView;
        if (mapView != null) {
            return mapView;
        }
        ChronicleQueueView chronicleQueueView = this;
        synchronized (chronicleQueueView) {
            MapView<T, M> mapView0 = this.mapView;
            if (mapView0 != null) {
                return mapView0;
            }
            this.mapView = new QueueViewAsMapView<T, M>(this, this.context, this.asset);
            return this.mapView;
        }
    }

    @Nullable
    public RollingChronicleQueue chronicleQueue() {
        return this.chronicleQueue;
    }

    public void replication(@NotNull RequestContext context, @NotNull Asset asset) {
        HostIdentifier hostIdentifier;
        try {
            hostIdentifier = asset.findOrCreateView(HostIdentifier.class);
        }
        catch (AssetNotFoundException anfe) {
            if (LOG.isDebugEnabled()) {
                Jvm.debug().on(this.getClass(), "replication not enabled " + anfe.getMessage());
            }
            return;
        }
        int remoteSourceIdentifier = this.queueConfig.sourceHostId(context.fullName());
        this.isSource = hostIdentifier.hostId() == remoteSourceIdentifier;
        this.isReplicating = true;
        Clusters clusters = asset.findView(Clusters.class);
        if (clusters == null) {
            LOG.warn("no cluster found name=" + context.cluster());
            Jvm.debug().on(this.getClass(), "no cluster found name=" + context.cluster());
            return;
        }
        EngineCluster engineCluster = clusters.get(context.cluster());
        String csp = context.fullName();
        if (engineCluster == null) {
            Jvm.debug().on(this.getClass(), "no cluster found name=" + context.cluster());
            LOG.warn("no cluster found name=" + context.cluster());
            return;
        }
        byte localIdentifier = hostIdentifier.hostId();
        if (LOG.isDebugEnabled()) {
            Jvm.debug().on(this.getClass(), "hostDetails : localIdentifier=" + localIdentifier + ",cluster=" + engineCluster.hostDetails());
        }
        boolean acknowledgement = this.queueConfig.acknowledgment();
        MessageAdaptor messageAdaptor = this.queueConfig.bytesFunction();
        for (EngineHostDetails hostDetails : engineCluster.hostDetails()) {
            byte remoteIdentifier = (byte)hostDetails.hostId();
            if (remoteIdentifier == localIdentifier) continue;
            engineCluster.findConnectionManager(remoteIdentifier).addListener((nc, isConnected) -> {
                if (!isConnected) {
                    return;
                }
                if (nc.isAcceptor()) {
                    return;
                }
                boolean isSource0 = remoteIdentifier == remoteSourceIdentifier;
                WriteMarshallable h = isSource0 ? ChronicleQueueView.newSource(this.chronicleQueue.createTailer().toEnd().index(), context.topicType(), context.elementType(), acknowledgement, messageAdaptor) : ChronicleQueueView.newSync(context.topicType(), context.elementType(), acknowledgement, messageAdaptor, this.chronicleQueue.wireType());
                long cid = nc.newCid();
                nc.wireOutPublisher().publish(w -> w.writeDocument(true, d -> d.writeEventName((WireKey)CoreFields.csp).text(csp).writeEventName((WireKey)CoreFields.cid).int64(cid).writeEventName((WireKey)CoreFields.handler).typedMarshallable(h)));
            });
        }
    }

    @Override
    @NotNull
    public KeySetView<T> keySet() {
        throw new UnsupportedOperationException("todo");
    }

    @Override
    @NotNull
    public Collection<M> values() {
        throw new UnsupportedOperationException("todo");
    }

    @Override
    @NotNull
    public EntrySetView<T, Object, M> entrySet() {
        throw new UnsupportedOperationException("todo");
    }

    @Override
    @NotNull
    public M getUsing(T key, Object using) {
        throw new UnsupportedOperationException("todo");
    }

    @Override
    public void registerTopicSubscriber(@NotNull TopicSubscriber<T, M> topicSubscriber) throws AssetNotFoundException {
        this.asset.registerTopicSubscriber(this.asset.fullName(), this.context.type(), this.context.type2(), topicSubscriber);
    }

    @Override
    public void unregisterTopicSubscriber(@NotNull TopicSubscriber<T, M> topicSubscriber) {
        throw new UnsupportedOperationException("todo");
    }

    @Override
    public void registerKeySubscriber(@NotNull Subscriber<T> subscriber) {
        throw new UnsupportedOperationException("todo");
    }

    @Override
    public void registerKeySubscriber(@NotNull Subscriber<T> subscriber, @NotNull Filter filter, @NotNull Set<RequestContext.Operation> contextOperations) {
        throw new UnsupportedOperationException("todo");
    }

    @Override
    public void registerSubscriber(@NotNull Subscriber<MapEvent<T, M>> subscriber, @NotNull Filter<MapEvent<T, M>> filter, @NotNull Set<RequestContext.Operation> contextOperations) {
        throw new UnsupportedOperationException("todo");
    }

    @Override
    public Reference<M> referenceFor(T key) {
        return this.mapView().referenceFor(key);
    }

    @Override
    public Class<T> keyType() {
        return this.mapView().keyType();
    }

    @Override
    public Class<M> valueType() {
        return this.mapView().valueType();
    }

    @Override
    public long longSize() {
        return this.mapView().longSize();
    }

    @Override
    @NotNull
    public M getAndPut(T key, M value) {
        return this.getAndPut(key, value);
    }

    @Override
    @Nullable
    public M getAndRemove(T key) {
        return this.mapView().getAndRemove(key);
    }

    public void unregisterTopicSubscriber(@NotNull T topic, @NotNull TopicSubscriber<T, M> topicSubscriber) {
        String name = "".equals(topic.toString().trim()) ? this.asset.fullName() : this.asset.fullName() + "/" + topic.toString();
        this.asset.unregisterTopicSubscriber(name, this.context.type(), this.context.type2(), topicSubscriber);
    }

    @Override
    @NotNull
    public Publisher<M> publisher(@NotNull T topic) {
        throw new UnsupportedOperationException("todo");
    }

    @Override
    public void registerSubscriber(@NotNull T topic, @NotNull Subscriber<M> subscriber) {
        String name = "".equals(topic.toString().trim()) ? this.asset.fullName() : this.asset.fullName() + "/" + topic.toString();
        this.asset.registerTopicSubscriber(name, this.context.type(), this.context.type2(), (topic1, message) -> subscriber.onMessage(message));
    }

    private RollingChronicleQueue newInstance(@Nullable String basePath, @NotNull WireType wireType) throws IOException {
        if (wireType == WireType.DELTA_BINARY) {
            throw new IllegalArgumentException("Chronicle Queues can not be set to use delta wire");
        }
        if (wireType != WireType.BINARY && wireType != WireType.DEFAULT_ZERO_BINARY) {
            throw new IllegalArgumentException("Currently the chronicle queue only supports Binary and Default Zero Binary Wire");
        }
        File baseFilePath = basePath == null ? new File(this.defaultPath, "") : new File(basePath);
        if (!baseFilePath.exists()) {
            Files.createDirectories(baseFilePath.toPath(), new FileAttribute[0]);
        }
        SingleChronicleQueueBuilder builder = wireType == WireType.DEFAULT_ZERO_BINARY ? SingleChronicleQueueBuilder.defaultZeroBinary((File)baseFilePath) : SingleChronicleQueueBuilder.binary((File)baseFilePath);
        return builder.build();
    }

    @NotNull
    private ExcerptTailer threadLocalTailer() {
        return this.threadLocal.get().tailer;
    }

    @NotNull
    private ExcerptAppender threadLocalAppender() {
        return this.threadLocal.get().appender;
    }

    @Nullable
    public QueueView.Tailer<T, M> tailer() {
        ExcerptTailer tailer = this.chronicleQueue.createTailer();
        LocalExcept localExcept = new LocalExcept();
        return () -> this.next(tailer, localExcept);
    }

    private QueueView.Excerpt<T, M> next(@NotNull ExcerptTailer excerptTailer, @NotNull LocalExcept excerpt) {
        excerpt.clear();
        try (DocumentContext dc = excerptTailer.readingDocument();){
            if (!dc.isPresent()) {
                QueueView.Excerpt<T, M> excerpt2 = null;
                return excerpt2;
            }
            Wire wire = dc.wire();
            long pos = wire.bytes().readPosition();
            Object topic = wire.readEvent(this.messageTypeClass);
            ValueIn valueIn = wire.getValueIn();
            if (Bytes.class.isAssignableFrom(this.elementTypeClass)) {
                valueIn.text(excerpt.text());
            } else {
                Object message = valueIn.object(this.elementTypeClass);
                excerpt.message(message);
            }
            LocalExcept localExcept = excerpt.topic(topic).index(excerptTailer.index());
            return localExcept;
        }
    }

    @Override
    @Nullable
    public QueueView.Excerpt<T, M> getExcerpt(long index) {
        ThreadLocalData threadLocalData = this.threadLocal.get();
        ExcerptTailer excerptTailer = threadLocalData.replayTailer;
        if (index == 0L) {
            excerptTailer.toStart();
        } else if (!excerptTailer.moveToIndex(index)) {
            return null;
        }
        try (DocumentContext dc = excerptTailer.readingDocument();){
            if (!dc.isPresent()) {
                QueueView.Excerpt<T, M> excerpt = null;
                return excerpt;
            }
            StringBuilder topic = Wires.acquireStringBuilder();
            Object message = dc.wire().readEventName(topic).object(this.elementTypeClass);
            LocalExcept localExcept = threadLocalData.excerpt.message(message).topic(ObjectUtils.convertTo(this.messageTypeClass, (Object)topic)).index(excerptTailer.index());
            return localExcept;
        }
    }

    @Override
    @Nullable
    public QueueView.Excerpt<T, M> getExcerpt(@NotNull T topic) {
        ThreadLocalData threadLocalData = this.threadLocal.get();
        ExcerptTailer excerptTailer = threadLocalData.replayTailer;
        while (true) {
            DocumentContext dc = excerptTailer.readingDocument();
            Throwable throwable = null;
            try {
                if (!dc.isPresent()) {
                    QueueView.Excerpt<T, M> excerpt = null;
                    return excerpt;
                }
                StringBuilder t = Wires.acquireStringBuilder();
                ValueIn valueIn = dc.wire().readEventName(t);
                Object topic1 = ObjectUtils.convertTo(this.messageTypeClass, (Object)t);
                if (!topic.equals(topic1)) continue;
                Object message = valueIn.object(this.elementTypeClass);
                LocalExcept localExcept = threadLocalData.excerpt.message(message).topic(topic1).index(excerptTailer.index());
                return localExcept;
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (dc == null) continue;
                if (throwable != null) {
                    try {
                        dc.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                dc.close();
                continue;
            }
            break;
        }
    }

    @Override
    public void set(T key, M element) {
        throw new UnsupportedOperationException("todo");
    }

    @Override
    public void publish(@NotNull T topic, @NotNull M message) {
        this.publishAndIndex(topic, message);
    }

    public void getExcerpt(@NotNull BiConsumer<CharSequence, M> consumer) {
        ExcerptTailer tailer = this.threadLocalTailer();
        tailer.readDocument(w -> {
            StringBuilder eventName = Wires.acquireStringBuilder();
            ValueIn valueIn = w.readEventName(eventName);
            consumer.accept(eventName, valueIn.object(this.elementTypeClass));
        });
    }

    @Override
    public long publishAndIndex(@NotNull T topic, @NotNull M message) {
        if (this.isReplicating && !this.isSource) {
            throw new IllegalStateException("You can not publish to a sink used in replication, you have to publish to the source");
        }
        ExcerptAppender excerptAppender = this.threadLocalAppender();
        try (DocumentContext dc = excerptAppender.writingDocument();){
            dc.wire().writeEvent(this.messageTypeClass, topic).object(this.elementTypeClass, message);
        }
        return excerptAppender.lastIndexAppended();
    }

    public long set(@NotNull M event) {
        if (this.isReplicating && !this.isSource) {
            throw new IllegalStateException("You can not publish to a sink used in replication, you have to publish to the source");
        }
        ExcerptAppender excerptAppender = this.threadLocalAppender();
        excerptAppender.writeDocument(w -> w.writeEventName(() -> "").object(event));
        return excerptAppender.lastIndexAppended();
    }

    @Override
    public boolean isEmpty() {
        return this.mapView().isEmpty();
    }

    @Override
    public boolean containsKey(Object key) {
        return this.mapView().containsKey(key);
    }

    @Override
    public boolean containsValue(Object value) {
        return this.mapView().containsValue(value);
    }

    @Override
    public M get(Object key) {
        return (M)this.mapView().get(key);
    }

    @Override
    public M put(T key, M value) {
        return this.mapView().put(key, value);
    }

    @Override
    public M remove(Object key) {
        return (M)this.mapView().remove(key);
    }

    @Override
    public void putAll(@NotNull Map<? extends T, ? extends M> m) {
        this.mapView().putAll(m);
    }

    @Override
    public void clear() {
        this.chronicleQueue.clear();
        this.mapView().clear();
    }

    @NotNull
    public File path() {
        throw new UnsupportedOperationException("todo");
    }

    @NotNull
    public WireType wireType() {
        throw new UnsupportedOperationException("todo");
    }

    public void close() {
        File file = this.chronicleQueue.file();
        this.chronicleQueue.close();
        if (this.dontPersist) {
            try {
                ChronicleQueueView.deleteFiles(file);
            }
            catch (Exception e) {
                Jvm.debug().on(this.getClass(), "Unable to delete " + file, (Throwable)e);
            }
        }
    }

    private void deleteFiles(TopicPublisher p) {
        if (p instanceof QueueTopicPublisher) {
            this.deleteFiles((ChronicleQueueView)((QueueTopicPublisher)p).underlying());
        }
    }

    protected void finalize() throws Throwable {
        super.finalize();
        Closeable.closeQuietly((Object)this);
    }

    @Override
    public void registerSubscriber(@NotNull Subscriber<MapEvent<T, M>> subscriber) {
        this.mapView().registerSubscriber(subscriber);
    }

    public void unregisterSubscriber(Subscriber subscriber) {
    }

    public int subscriberCount() {
        throw new UnsupportedOperationException("todo");
    }

    public String dump() {
        return this.chronicleQueue.dump();
    }

    @Override
    @Nullable
    public <E> Asset createSubAsset(@NotNull VanillaAsset vanillaAsset, String name, Class<E> valueType) {
        return new VanillaSubAsset<E>(vanillaAsset, name, valueType, null);
    }

    @Override
    public M putIfAbsent(@NotNull T key, M value) {
        return this.mapView().putIfAbsent(key, value);
    }

    @Override
    public boolean remove(@NotNull Object key, Object value) {
        return this.mapView().remove(key, value);
    }

    @Override
    public boolean replace(@NotNull T key, @NotNull M oldValue, @NotNull M newValue) {
        return this.mapView().replace(key, oldValue, newValue);
    }

    @Override
    public M replace(@NotNull T key, @NotNull M value) {
        return this.mapView().replace(key, value);
    }

    @Override
    public Asset asset() {
        return this.mapView().asset();
    }

    @Override
    @Nullable
    public Object underlying() {
        return this.chronicleQueue;
    }

    class ThreadLocalData {
        @NotNull
        final ExcerptAppender appender;
        @NotNull
        final ExcerptTailer tailer;
        @NotNull
        final ExcerptTailer replayTailer;
        @NotNull
        final LocalExcept excerpt;

        ThreadLocalData(ChronicleQueue chronicleQueue) {
            this.appender = chronicleQueue.acquireAppender();
            this.appender.padToCacheAlign(MarshallableOut.Padding.ALWAYS);
            this.tailer = chronicleQueue.createTailer();
            this.replayTailer = chronicleQueue.createTailer();
            this.excerpt = new LocalExcept();
        }
    }

    private static class QueueViewAsMapView<K, V>
    extends VanillaMapView<K, V> {
        @NotNull
        private final QueueView<K, V> queueView;

        QueueViewAsMapView(@NotNull QueueView<K, V> queueView, @NotNull RequestContext context, @NotNull Asset asset) {
            super(context, asset, new VanillaKeyValueStore(context, asset));
            this.queueView = queueView;
            queueView.registerTopicSubscriber((T topic, M message) -> {
                if (message == null) {
                    super.remove(topic);
                } else {
                    super.put(topic, message);
                }
            });
        }

        @Override
        @Nullable
        public V put(@NotNull K key, @NotNull V value) {
            if (this.putReturnsNull) {
                this.queueView.publishAndIndex(key, value);
                super.put(key, value);
                return null;
            }
            Object v = super.get((Object)key);
            this.queueView.publishAndIndex(key, value);
            super.put(key, value);
            return v;
        }

        @Override
        public void set(K key, @NotNull V value) {
            this.queueView.publishAndIndex(key, value);
            super.put(key, value);
        }

        @Override
        public V remove(Object key) {
            if (this.removeReturnsNull) {
                this.queueView.publishAndIndex(key, null);
                super.remove(key);
                return null;
            }
            Object v = super.get(key);
            this.queueView.publishAndIndex(key, null);
            super.remove(key);
            return v;
        }

        @Override
        public void clear() {
            Iterator iterator = this.entrySet().iterator();
            while (iterator.hasNext()) {
                this.remove(((Map.Entry)iterator.next()).getKey());
            }
        }

        @Override
        @Nullable
        public V putIfAbsent(@net.openhft.chronicle.core.annotation.NotNull K key, @NotNull V value) {
            this.checkKey(key);
            this.checkValue(value);
            V v = super.putIfAbsent(key, value);
            if (v != null) {
                this.queueView.publishAndIndex(key, value);
            }
            return v;
        }

        @Override
        public boolean remove(@net.openhft.chronicle.core.annotation.NotNull Object key, Object value) {
            if (!super.remove(key, value)) {
                return false;
            }
            this.queueView.publishAndIndex(key, null);
            return true;
        }

        @Override
        public boolean replace(@net.openhft.chronicle.core.annotation.NotNull K key, @net.openhft.chronicle.core.annotation.NotNull V oldValue, @net.openhft.chronicle.core.annotation.NotNull V newValue) {
            if (!super.replace(key, oldValue, newValue)) {
                return false;
            }
            this.queueView.publishAndIndex(key, newValue);
            return true;
        }

        @Override
        @Nullable
        public V replace(@net.openhft.chronicle.core.annotation.NotNull K key, @net.openhft.chronicle.core.annotation.NotNull V value) {
            V replaced = super.replace(key, value);
            this.queueView.publishAndIndex(key, value);
            return replaced;
        }
    }

    public static class LocalExcept<T, M>
    implements QueueView.Excerpt<T, M>,
    Marshallable,
    Map.Entry<T, M> {
        @Nullable
        private T topic;
        @Nullable
        private M message;
        private Bytes bytes;
        private long index;

        @Override
        @Nullable
        public T topic() {
            return this.topic;
        }

        @Override
        @Nullable
        public M message() {
            return this.message;
        }

        @Override
        public long index() {
            return this.index;
        }

        @NotNull
        public LocalExcept<T, M> index(long index) {
            this.index = index;
            return this;
        }

        @NotNull
        LocalExcept message(M message) {
            this.message = message;
            return this;
        }

        @NotNull
        LocalExcept topic(T topic) {
            this.topic = topic;
            return this;
        }

        @NotNull
        public String toString() {
            return "Except{topic=" + this.topic + ", message=" + this.message + '}';
        }

        public void writeMarshallable(@NotNull WireOut wireOut) {
            wireOut.write(() -> "topic").object(this.topic);
            wireOut.write(() -> "message").object(this.message);
            wireOut.write(() -> "index").int64(this.index);
        }

        public void readMarshallable(@NotNull WireIn wireIn) throws IORuntimeException {
            this.topic(wireIn.read(() -> "topic").object(Object.class));
            this.message(wireIn.read(() -> "message").object(Object.class));
            this.index(wireIn.read(() -> "index").int64());
        }

        @Override
        public void clear() {
            this.message = null;
            this.topic = null;
            this.index = -1L;
        }

        public Bytes text() {
            if (this.bytes == null) {
                this.bytes = Bytes.allocateElasticDirect();
            } else {
                this.bytes.clear();
            }
            this.message = this.bytes;
            return this.bytes;
        }

        @Override
        @Nullable
        public T getKey() {
            return this.topic;
        }

        @Override
        @Nullable
        public M getValue() {
            return this.message;
        }

        @Override
        @NotNull
        public M setValue(M value) {
            throw new UnsupportedOperationException("todo");
        }
    }
}

