MQTT Sparkplug intergration

Good day everyone. I am very new to OR and still learning how all the internal features fit together. I am trying to figure out which would be the best way to integrate the MQTT Sparkplug specification.

A little about the specification, Sparkplug solves 3 problems with MQTT:

  1. Structured topic names space. The specification defines a standardized topic structure that ALL clients must conform to.
  2. Structured Payload. The specification defines the structure of the payload in JSON but is serialized with protobuf
  3. State awayness. The specification defines different messages such as BIRTH, DEATH, DATA, and COMMAND messages which are defined in the topic and each would need to be handled differently by the OR server.
    The Birth message provides all the information required for the OR server to create data points. names, engineering units, data type etc.
    Data messages, provide the device data
    Command messages are used to send commands to the devices.

I am trying to determine which would be the best approach to implement this in OR, using a custom agent or a custom MQTT handler.
The end goal would be that in the manager interface, you could register a topic eg:
namespace/group_id/message_type/edge_node_id/
when a Birth message is received on that topic an asset is created with attributes from the birth message and the “state” attribute is set to healthy/online. Attributes are linked to the correct data message
when a Data message is received attribute update takes place.
when a death message is received the “state” attribute is set to unhealthy/offline

What are your thoughts on this, would you need to combine a custom handler and a custom agent to handle the different messages, or do it all in an agent or all in the custom handler?

Here are some links to the specification https://sparkplug.eclipse.org/

I personally believe that it would be easier (especially in the long run) to implement a new MQTT handler that listens for a specific subtopic (e.g. {realm}/{clientid}/sparkplug/#). It allows all of the normal CRUD operations using AssetStorageService, you only need to implement a new MQTTHandler, and the wiki has been recently updated for this. The added bonus is that you can create a new Asset type to go along with your sparkplug assets!

Unless @Rich has any other ideas, I think that would be the way to go for you.

Best of luck!

1 Like

Thanks, Panos that handler looks like it covers enough for me to get started. Im just wondering why you suggest deploying another broker. From what I understand the handler is able to filter the topics it will handle to with the topicMatches() method. Since all Sparkplug messages begin with “spBv” Is there a reason to have OR sping up 2 separate brokers as ill have to be concerned around port conflict then.

Maybe there is something I dont understand about the broker service…

1 Like

Oh I am terribly sorry, I meant an MQTT handler, not an MQTT broker. I also edited it on the comment I made. I hope this solves all of your questions, really sorry for confusing you.

Good luck with the implementation!

After taking a look at your message a bit more, I can also tell you that implementing sparkplug would be much easier if you take advantage of multi-level asset type inheritance. By creating a SparkplugThing asset, you can then extend that to any other asset types you would like whilst retaining functionality.

Would this need to be done with an MQTT sparkplug agent that knows how to parse the different topics ? or would this logic be in the asset type?

I am suggesting that you write a comprehensive MQTT Handler that implements all of the sparkplug specification, based on a new asset type that you shall create.

From a data point of view, the sparkplug client sends the first data, a birth from what I understand, to topic {realmId}/{clientId}/sparkplug/birth (this can change depending on how you would like it, I just am unfamiliar with the specification) on your OpenRemote MQTT broker. The topic namespace after {realmId}/{clientId}/sparkplug/ would be exactly how it is defined within the sparkplug specification.

Your sparkplug MQTT handler, an implemented MQTTHandler, understands the sparkplug part of your MQTT topic, and it recognizes that this is a sparkplug message, and that it should handle it. The handler will say, “oh, this is a message destined for me, I processed the topic and I understand it is a sparkplug message, let me handle it”, and then you are moved to the onPublish method. There, you can say “oh, this is a birth message, use a birth function (or however you choose to call it) to create a new asset, using the payload included in the message”. The payload can then be serialized into a protobuf object, as defined in the specification. I am guessing that there would be 4 of those functions, maybe “onBirth”, “onData”, “onCommand”, “onDeath”. You would call them from within the onPublish function. Then, you would move into creating/modifying/deleting the Asset.

That asset will be a Sparkplug asset, a specific asset type that you create for type-safety and ease of use (combining it with the protobuf object), as you can have “required” attributes etc… If it is a specific type of asset you know beforehand, you can then implement THAT asset type based on the Sparkplug asset type. If the message is a death, then you run the onDeath function, etc. When running these functions, you can set the attributes (think of them as instance variables) of the asset to whatever your payload says.

Since this is a specification built on top of the MQTT protocol, I would suggest that you attack the specification-side from an MQTT handler. Ideally, you would handle everything sparkplug-specification-related in the SparkplugMQTTHandler you make.

I would not mess with agents, and I would not write any code in the asset type itself. This is a concrete specification, with specific topics and payloads being used. From other devices/libraries, you would just prepend the MQTT topics with {realmId}/{clientId}/sparkplug/ and ideally you should be good to go.

I am not going to go into detail about this, but at a certain point, you will also require to auto-provision the devices in a secure manner, using proper authentication. You can look into UserAssetProvisioningMQTTHandler for more details.

Another example is, if you know that a hardware vendor sends MQTT messages to a specific topic, with a specific payload, you would write an MQTT handler that checks for the topic to see if it is appropriate to be handled by your MQTT handler, then you would use the onPublish method to decode the payload and create/modify the asset with the attributes that are specified in the payload.

Sorry for the long message, please let me know if there are any other questions!

1 Like

Thank you for this @panos I am getting very close to wrapping my head around this and committing to building this. I still have a few things missing tho:
I need the MQTTHandler to publish to a topic when asset attributes values are changed in the UI.
When the handler inits I would like to find all assets of type sparkplug and subscribe to internal attribute events and then handle them with a function I write.

You can take a look at how the DefaultMQTTHandler does it.

Thank you, and just to make sure, do I need to worry about an infinite loop where incoming topics trigger atributeEvents?

I am not exactly sure of what you mean. Do you mean if an attribute is changed through you MQTT handler, then you publish a message about that change, then the attribute changes, and then you’re in a loop? I am not exactly understanding where the infinite loop could take place.

yes, my concern is that when asset attributes are updated on during the onPulish method, it will create an attribute event and my subscription will handle that event again, I understand I can subscribe to client events only but I may also want to change values with rules in the future and have those values be written to a topic. So just subscribing to client events won’t be sufficient. ill have to subscribe to internal events as well.

Or maybe I just don’t understand these events yet.

An AttributeEvent is emitted whenever an attribute is altered, whether that is the value itself or any of its metaItems. What you could do to handle this would be to control which events are actually emitted on your onSubscribe method. To be precise, you could only emit events where the value is changed, potentially, but I am guessing that the Sparkplug specification has something more specific for your use case.

@panos I have gotten quite far with my handler, unfortunately, this is a side project so cannot work all day on it but so far my repo is here GitHub - Craig-IAS/OR-Sparkplug: Template repo for creating an OpenRemote custom project

  • Filtering the correct topic

  • using the Eclipse Tahu library to manage all the Sparkplug domain stuff

  • creating a new asset with a Birth message

  • storing data with a Data message

I want to start working on the down communication to the device. But I’m a little confused about where to start. When a device connects it will sub to its own CMD topic. I am not sure if I should register a consumer for every device or have a general consumer configured in the init method. Since they will all be handled the same.

ok, nevermind I have managed to copy a lot of code from the defaultManager and can successfully publish attribute changes on the correct topic. One thing that is confusing me is that I get this message:

2023-10-11 08:56:05.131  FINE    [Pool-AttributeEventQueue-22   ] ote.manager.asset.AssetProcessingService : <<< Attribute event processed in 7ms: attribute=Asset ID=6d5M6vXric7VQt1UhQ4zdm, Asset name=device1, Attribute{name='NodeControlReboot', value='true', timestamp='1697036165070'} , consumer=null
2023-10-11 08:56:05.149  FINE    [Thread-7 (ActiveMQ-clien..ads)] emote.manager.mqtt.MQTTBrokerService.API : Client is no longer connected so dropping publish to topic 'spBv1/0/#': clientID=null

which shows no consumer and that it is dropping the publish. Yet I still recieve the message on my MQTT client

@Rich would you mind helping me out here, been stuck on this for like a2 days.

On my custom MQTT handler my data comes from the sensor and triggers a attribueEvent:

   headers.put(HEADER_SOURCE, SENSOR);
        headers.put(HEADER_CONNECTION_TYPE, ClientEventService.HEADER_CONNECTION_TYPE_MQTT);
        messageBrokerService.getFluentProducerTemplate()
                .withHeaders(headers)
                .withBody(attributeEvent)
                .to(CLIENT_INBOUND_QUEUE)
                .asyncSend();

So far all good my data gets logged. However my onSubscribe method is also listening for attributeEvents and publishing messages back to the sensor:

 @Override
    public void onSubscribe(RemotingConnection connection, Topic topic) {

        //log the topic and body
        getLogger().info("onSubscribe: " + topic );
        //The topic is being split by a period so for now we rebuild the topic
        //TODO: fix this somewhere else
        String topicString = topic.getString().replaceFirst("/", ".");
        String[] topicArray = topicString.split("/");

        AssetFilter filter = buildAssetFilter(topicArray);

        Consumer<SharedEvent> eventConsumer = getSubscriptionEventConsumer(connection, topicArray);
        EventSubscription subscription = new EventSubscription(
                AttributeEvent.class,
                filter,
                topicString
        );
        Map<String, Object> headers = prepareHeaders(topicArray[1], connection);
        messageBrokerService.getFluentProducerTemplate()
                .withHeaders(headers)
                .withBody(subscription)
                .to(CLIENT_INBOUND_QUEUE)
                .asyncSend();
        synchronized (connectionSubscriberInfoMap) {
            connectionSubscriberInfoMap.compute(getConnectionIDString(connection), (connectionID, subscriberInfo) -> {
                if (subscriberInfo == null) {
                    return new SubscriberInfo(topicString, eventConsumer);
                } else {
                    subscriberInfo.add(topicString, eventConsumer);
                    return subscriberInfo;
                }
            });
        }
    }

Now what is happening is my consumer is firing on all events even incoming Sensor events. which is problematic is this reboots the sensor and then the cycle happens all over again.

is there a way I can exclude SENSOR events from my consumer?

I’m not sure if that’s what you need, but I would recommend that you create the EventSubscription in the start method. Also, I would recommend you use ClientEventService#addInternalSubscription for your events. If you can handle the events in one central method, that would be probably easier, as you mentioned above.

I would also suggest not using wildcards in your publish messages, if the message you need to send is for only one device/client.

For your issue, what you could do is, in the central AttributeEvent handler method, check the previous and current value and then compare the two, if the value is different, continue publishing the message.

I hope this helps!

Thanks @panos on my first try the ClientEventService#addInternalSubscription seems to be working, Would be awesome if the AtributeEvent had a source so that you could decide what to do with it.
Luckily the Sparkplug library keeps a cache of the most recent messages so I’ll use that to compare the values. Not ideal but I think it will work at least I won’t have to make a db query.

Thanks again for the help here. I’m probably about 2 weeks from a a release