/*
 * Decompiled with CFR 0.152.
 */
package org.apache.plc4x.java.opcua.protocol;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.plc4x.java.api.messages.PlcMetadataKeys;
import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.api.metadata.Metadata;
import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import org.apache.plc4x.java.api.model.PlcTag;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.types.PlcSubscriptionType;
import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.opcua.context.Conversation;
import org.apache.plc4x.java.opcua.protocol.OpcuaProtocolLogic;
import org.apache.plc4x.java.opcua.readwrite.AttributeId;
import org.apache.plc4x.java.opcua.readwrite.BinaryExtensionObjectWithMask;
import org.apache.plc4x.java.opcua.readwrite.ContentFilter;
import org.apache.plc4x.java.opcua.readwrite.CreateMonitoredItemsRequest;
import org.apache.plc4x.java.opcua.readwrite.CreateMonitoredItemsResponse;
import org.apache.plc4x.java.opcua.readwrite.DataChangeNotification;
import org.apache.plc4x.java.opcua.readwrite.DataValue;
import org.apache.plc4x.java.opcua.readwrite.DeleteSubscriptionsRequest;
import org.apache.plc4x.java.opcua.readwrite.DeleteSubscriptionsResponse;
import org.apache.plc4x.java.opcua.readwrite.EventFieldList;
import org.apache.plc4x.java.opcua.readwrite.EventFilter;
import org.apache.plc4x.java.opcua.readwrite.EventNotificationList;
import org.apache.plc4x.java.opcua.readwrite.ExpandedNodeId;
import org.apache.plc4x.java.opcua.readwrite.ExtensionObject;
import org.apache.plc4x.java.opcua.readwrite.ExtensionObjectDefinition;
import org.apache.plc4x.java.opcua.readwrite.ExtensionObjectEncodingMask;
import org.apache.plc4x.java.opcua.readwrite.MonitoredItemCreateRequest;
import org.apache.plc4x.java.opcua.readwrite.MonitoredItemCreateResult;
import org.apache.plc4x.java.opcua.readwrite.MonitoredItemNotification;
import org.apache.plc4x.java.opcua.readwrite.MonitoringMode;
import org.apache.plc4x.java.opcua.readwrite.MonitoringParameters;
import org.apache.plc4x.java.opcua.readwrite.NodeId;
import org.apache.plc4x.java.opcua.readwrite.NodeIdFourByte;
import org.apache.plc4x.java.opcua.readwrite.NotificationMessage;
import org.apache.plc4x.java.opcua.readwrite.OpcuaNodeIdServicesObjectType;
import org.apache.plc4x.java.opcua.readwrite.OpcuaStatusCode;
import org.apache.plc4x.java.opcua.readwrite.PascalString;
import org.apache.plc4x.java.opcua.readwrite.PublishRequest;
import org.apache.plc4x.java.opcua.readwrite.PublishResponse;
import org.apache.plc4x.java.opcua.readwrite.QualifiedName;
import org.apache.plc4x.java.opcua.readwrite.ReadValueId;
import org.apache.plc4x.java.opcua.readwrite.RequestHeader;
import org.apache.plc4x.java.opcua.readwrite.SimpleAttributeOperand;
import org.apache.plc4x.java.opcua.readwrite.SubscriptionAcknowledgement;
import org.apache.plc4x.java.opcua.readwrite.TimestampsToReturn;
import org.apache.plc4x.java.opcua.readwrite.Variant;
import org.apache.plc4x.java.opcua.tag.OpcuaTag;
import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
import org.apache.plc4x.java.spi.messages.PlcSubscriber;
import org.apache.plc4x.java.spi.messages.utils.DefaultPlcResponseItem;
import org.apache.plc4x.java.spi.messages.utils.PlcResponseItem;
import org.apache.plc4x.java.spi.metadata.DefaultMetadata;
import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionTag;
import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
import org.apache.plc4x.java.spi.values.PlcNull;
import org.apache.plc4x.java.spi.values.PlcStruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpcuaSubscriptionHandle
extends DefaultPlcSubscriptionHandle {
    private static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor(runnable -> new Thread(runnable, "plc4x-opcua-subscription-scheduler"));
    private final Logger logger = LoggerFactory.getLogger(OpcuaSubscriptionHandle.class);
    private final Set<Consumer<PlcSubscriptionEvent>> consumers;
    private final List<String> tagNames;
    private final Conversation conversation;
    private final PlcSubscriptionRequest subscriptionRequest;
    private final OpcuaProtocolLogic plcSubscriber;
    private final Long subscriptionId;
    private final long cycleTime;
    private final long revisedCycleTime;
    private final AtomicLong clientHandles = new AtomicLong(1L);
    private final RequestTransactionManager tm;
    private final List<SubscriptionAcknowledgement> outstandingAcknowledgements = new CopyOnWriteArrayList<SubscriptionAcknowledgement>();
    private ScheduledFuture<?> publishTask;

    public OpcuaSubscriptionHandle(OpcuaProtocolLogic plcSubscriber, RequestTransactionManager tm, Conversation conversation, PlcSubscriptionRequest subscriptionRequest, Long subscriptionId, long cycleTime) {
        super((PlcSubscriber)plcSubscriber);
        this.tm = tm;
        this.consumers = new HashSet<Consumer<PlcSubscriptionEvent>>();
        this.subscriptionRequest = subscriptionRequest;
        this.tagNames = new ArrayList<String>(subscriptionRequest.getTagNames());
        this.conversation = conversation;
        this.subscriptionId = subscriptionId;
        this.plcSubscriber = plcSubscriber;
        this.cycleTime = cycleTime;
        this.revisedCycleTime = cycleTime;
    }

    public CompletableFuture<OpcuaSubscriptionHandle> onSubscribeCreateMonitoredItemsRequest() {
        ArrayList<MonitoredItemCreateRequest> requestList = new ArrayList<MonitoredItemCreateRequest>(this.tagNames.size());
        for (String tagName : this.tagNames) {
            DefaultPlcSubscriptionTag tagDefaultPlcSubscription = (DefaultPlcSubscriptionTag)this.subscriptionRequest.getTag(tagName);
            OpcuaTag opcTag = (OpcuaTag)tagDefaultPlcSubscription.getTag();
            NodeId idNode = OpcuaProtocolLogic.generateNodeId(opcTag);
            ReadValueId readValueId = new ReadValueId(idNode, opcTag.getAttributeId().getValue(), OpcuaProtocolLogic.NULL_STRING, new QualifiedName(0, OpcuaProtocolLogic.NULL_STRING));
            MonitoringMode monitoringMode = MonitoringMode.monitoringModeReporting;
            ExtensionObject eventFilter = OpcuaProtocolLogic.NULL_EXTENSION_OBJECT;
            if (tagDefaultPlcSubscription.getPlcSubscriptionType() == PlcSubscriptionType.EVENT) {
                NodeId nodeId = new NodeId(new NodeIdFourByte(0, OpcuaNodeIdServicesObjectType.BaseEventType.getValue()));
                ArrayList<SimpleAttributeOperand> filterOperand = new ArrayList<SimpleAttributeOperand>();
                Map<String, String> config = opcTag.getConfig();
                for (Map.Entry<String, String> entry : config.entrySet()) {
                    filterOperand.add(new SimpleAttributeOperand(nodeId, List.of(new QualifiedName(0, new PascalString(entry.getKey()))), AttributeId.Value.getValue(), OpcuaProtocolLogic.NULL_STRING));
                }
                EventFilter filterPayload = new EventFilter(filterOperand, new ContentFilter(null));
                ExpandedNodeId expandedNodeId = new ExpandedNodeId(false, false, new NodeIdFourByte(0, filterPayload.getExtensionId()), null, null);
                eventFilter = new BinaryExtensionObjectWithMask(expandedNodeId, new ExtensionObjectEncodingMask(false, false, true), filterPayload);
                readValueId = new ReadValueId(idNode, AttributeId.EventNotifier.getValue(), OpcuaProtocolLogic.NULL_STRING, new QualifiedName(0, OpcuaProtocolLogic.NULL_STRING));
            }
            long clientHandle = this.clientHandles.getAndIncrement();
            MonitoringParameters parameters = new MonitoringParameters(clientHandle, this.cycleTime, eventFilter, 1L, true);
            MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, monitoringMode, parameters);
            requestList.add(request);
        }
        RequestHeader requestHeader = this.conversation.createRequestHeader();
        CreateMonitoredItemsRequest createMonitoredItemsRequest = new CreateMonitoredItemsRequest(requestHeader, this.subscriptionId, TimestampsToReturn.timestampsToReturnBoth, requestList);
        return ((CompletableFuture)this.conversation.submit(createMonitoredItemsRequest, CreateMonitoredItemsResponse.class).whenComplete((response, error) -> {
            if (error instanceof TimeoutException) {
                this.logger.info("Timeout while sending the Create Monitored Item Subscription Message", error);
            } else if (error != null) {
                this.logger.info("Error while sending the Create Monitored Item Subscription Message", error);
            }
        })).thenApply(responseMessage -> {
            MonitoredItemCreateResult[] array = (MonitoredItemCreateResult[])responseMessage.getResults().stream().toArray(MonitoredItemCreateResult[]::new);
            int index = 0;
            int arrayLength = array.length;
            while (index < arrayLength) {
                MonitoredItemCreateResult result = array[index];
                if (OpcuaStatusCode.enumForValue(result.getStatusCode().getStatusCode()) != OpcuaStatusCode.Good) {
                    this.logger.error("Invalid Tag {}, subscription created without this tag", (Object)this.tagNames.get(index));
                } else {
                    this.logger.debug("Tag {} was added to the subscription", (Object)this.tagNames.get(index));
                }
                ++index;
            }
            this.logger.trace("Scheduling publish event for subscription {}", (Object)this.subscriptionId);
            this.publishTask = EXECUTOR.scheduleAtFixedRate(this::sendPublishRequest, this.revisedCycleTime / 2L, this.revisedCycleTime, TimeUnit.MILLISECONDS);
            return this;
        });
    }

    private void sendPublishRequest() {
        LinkedList outstandingRequests = new LinkedList();
        if (outstandingRequests.size() <= 1) {
            RequestHeader requestHeader = this.conversation.createRequestHeader(this.revisedCycleTime * 10L);
            ArrayList<SubscriptionAcknowledgement> acks = new ArrayList<SubscriptionAcknowledgement>(this.outstandingAcknowledgements);
            int ackLength = acks.size();
            this.outstandingAcknowledgements.removeAll(acks);
            PublishRequest publishRequest = new PublishRequest(requestHeader, acks);
            RequestTransactionManager.RequestTransaction transaction = this.tm.startRequest();
            transaction.submit(() -> {
                this.logger.trace("Sent publish request with {} acks", (Object)ackLength);
                ((CompletableFuture)this.conversation.submit(publishRequest, PublishResponse.class).thenAccept(responseMessage -> {
                    outstandingRequests.remove(responseMessage.getResponseHeader().getRequestHandle());
                    Iterator<Object> iterator = responseMessage.getAvailableSequenceNumbers().iterator();
                    while (iterator.hasNext()) {
                        long availableSequenceNumber = iterator.next();
                        this.outstandingAcknowledgements.add(new SubscriptionAcknowledgement(this.subscriptionId, availableSequenceNumber));
                    }
                    NotificationMessage message = responseMessage.getNotificationMessage();
                    if (message.getNotificationData() != null) {
                        for (ExtensionObject notificationMessage : message.getNotificationData()) {
                            ExtensionObjectDefinition data;
                            ExtensionObjectDefinition notification = notificationMessage.getBody();
                            if (notification instanceof DataChangeNotification) {
                                this.logger.trace("Found a Data Change Notification");
                                data = (DataChangeNotification)notification;
                                if (((DataChangeNotification)data).getMonitoredItems().isEmpty()) continue;
                                this.onMonitoredValue(((DataChangeNotification)data).getMonitoredItems());
                                continue;
                            }
                            if (notification instanceof EventNotificationList) {
                                this.logger.trace("Found a Event Notification");
                                data = (EventNotificationList)notification;
                                if (((EventNotificationList)data).getEvents().isEmpty()) continue;
                                this.onEventNotification(((EventNotificationList)data).getEvents());
                                continue;
                            }
                            this.logger.warn("Unsupported Notification type {}", (Object)notification.getClass().getName());
                        }
                    }
                })).whenComplete((result, error) -> {
                    if (error != null) {
                        this.logger.warn("Publish request of subscription {} resulted in error reported by server", (Object)this.subscriptionId, error);
                        transaction.failRequest(error);
                    } else {
                        this.logger.trace("Completed publish request for subscription {}", (Object)this.subscriptionId);
                        transaction.endRequest();
                    }
                });
                outstandingRequests.add(requestHeader.getRequestHandle());
            });
        }
    }

    public void stopSubscriber() {
        RequestHeader requestHeader = this.conversation.createRequestHeader(this.revisedCycleTime * 10L);
        List<Long> subscriptions = Collections.singletonList(this.subscriptionId);
        DeleteSubscriptionsRequest deleteSubscriptionRequest = new DeleteSubscriptionsRequest(requestHeader, subscriptions);
        RequestTransactionManager.RequestTransaction transaction = this.tm.startRequest();
        transaction.submit(() -> ((CompletableFuture)this.conversation.submit(deleteSubscriptionRequest, DeleteSubscriptionsResponse.class).thenAccept(responseMessage -> {
            boolean bl = this.publishTask.cancel(true);
        })).whenComplete((result, error) -> {
            if (error != null) {
                this.logger.error("Deletion of subscription resulted in error", error);
                transaction.failRequest(error);
            } else {
                transaction.endRequest();
            }
            this.plcSubscriber.removeSubscription(this.subscriptionId);
        }));
    }

    private void onMonitoredValue(List<MonitoredItemNotification> values) {
        long receiveTs = System.currentTimeMillis();
        Metadata responseMetadata = new DefaultMetadata.Builder().put(PlcMetadataKeys.RECEIVE_TIMESTAMP, (Object)receiveTs).build();
        ArrayList<DataValue> dataValues = new ArrayList<DataValue>(values.size());
        LinkedHashMap<String, PlcTag> tagMap = new LinkedHashMap<String, PlcTag>();
        for (MonitoredItemNotification value : values) {
            String tagName = this.tagNames.get((int)value.getClientHandle() - 1);
            tagMap.put(tagName, this.subscriptionRequest.getTag(tagName).getTag());
            dataValues.add(value.getValue());
        }
        Map.Entry<Map<String, Metadata>, Map<String, PlcResponseItem<PlcValue>>> mappedResponse = this.plcSubscriber.readResponse(tagMap, dataValues, responseMetadata);
        DefaultPlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(receiveTs), mappedResponse.getValue(), mappedResponse.getKey());
        this.consumers.forEach(arg_0 -> OpcuaSubscriptionHandle.lambda$11((PlcSubscriptionEvent)event, arg_0));
    }

    private void onEventNotification(List<EventFieldList> events) {
        EventFieldList event2;
        long receiveTs = System.currentTimeMillis();
        Metadata responseMetadata = new DefaultMetadata.Builder().put(PlcMetadataKeys.RECEIVE_TIMESTAMP, (Object)receiveTs).build();
        HashMap<String, Metadata> metadata = new HashMap<String, Metadata>();
        LinkedHashMap<String, DefaultPlcResponseItem> tagValues = new LinkedHashMap<String, DefaultPlcResponseItem>();
        for (EventFieldList event2 : events) {
            String tagName = this.tagNames.get((int)event2.getClientHandle() - 1);
            OpcuaTag tag = (OpcuaTag)this.subscriptionRequest.getTag(tagName).getTag();
            Iterator<String> fieldNames = tag.getConfig().keySet().iterator();
            LinkedHashMap<String, PlcValue> mapping = new LinkedHashMap<String, PlcValue>();
            metadata.put(tagName, responseMetadata);
            for (Variant variant : event2.getEventFields()) {
                if (fieldNames.hasNext()) {
                    String fieldName = fieldNames.next();
                    PlcValue plcValue = OpcuaProtocolLogic.variantToPlcValue((PlcTag)tag, variant);
                    mapping.put(fieldName, plcValue);
                    tagValues.put(tagName, new DefaultPlcResponseItem(PlcResponseCode.OK, (Object)new PlcStruct(mapping)));
                    continue;
                }
                this.logger.error("Could not map event notification response, subscription received more data than expected");
                tagValues.put(tagName, new DefaultPlcResponseItem(PlcResponseCode.INTERNAL_ERROR, (Object)new PlcNull()));
            }
        }
        event2 = new DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(receiveTs), tagValues, metadata);
        this.consumers.forEach(arg_0 -> OpcuaSubscriptionHandle.lambda$12((PlcSubscriptionEvent)event2, arg_0));
    }

    public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer) {
        this.logger.info("Registering a new OPCUA subscription consumer");
        this.consumers.add(consumer);
        return new DefaultPlcConsumerRegistration((PlcSubscriber)this.plcSubscriber, consumer, new PlcSubscriptionHandle[]{this});
    }

    public Long getSubscriptionId() {
        return this.subscriptionId;
    }

    private static /* synthetic */ void lambda$11(PlcSubscriptionEvent plcSubscriptionEvent, Consumer plcSubscriptionEventConsumer) {
        plcSubscriptionEventConsumer.accept(plcSubscriptionEvent);
    }

    private static /* synthetic */ void lambda$12(PlcSubscriptionEvent plcSubscriptionEvent, Consumer plcSubscriptionEventConsumer) {
        plcSubscriptionEventConsumer.accept(plcSubscriptionEvent);
    }
}

