Updating Historical Attribute Data Prior to the Latest Timestamp through MQTT API

Hi everyone,

The devices I usually work with send real-time data, but occasionally they may send sampled data from past moments, either due to communication failures or because we group data from different moments and send them all together.

The data format I send over MQTT includes the timestamp in milliseconds, for example: {“data0”: 11.3, “ts”: 1695715200000}. I send this message to a “JSON Object” attribute (configured as “Rule Event”) and process it using a Groovy Rule similar to the following rule: https://github.com/Raqbit/openremote/blob/radio-protocol/test/src/main/resources/org/openremote/test/rules/TestRule.groovy. Then, I store the information for the “data0” attribute with its corresponding timestamp as follows:

AttributeEvent att_event = new AttributeEvent(state.id, "data0", 11.3, 1695715200000)
assets.dispatch(att_event)

Therefore, on certain occasions, I send older data than the last received for a particular attribute. For example, the last data for the “data0” attribute has a timestamp of 1695715200000 (20230926T100000), and I send data with a timestamp from an hour before, which is 1695711600000 (20230926T090000).

In this case, the system does not store these older data and raises the exception “org.openremote.manager.asset.AssetProcessingException: EVENT_OUTDATED (last asset state time: Tue Sep 26 10:00:00 CEST 2023/1695715200000, event time: Tue Sep 26 09:00:00 CEST 2023/1695711600000).”

Is there a way to disable this exception without modifying the “openremote/manager/src/main/java/org/openremote/manager/asset/AssetProcessingService.java” file? The code that raises the exception is:

// Check the last update timestamp of the attribute, ignoring any event that is older than last update
long finalEventTime = eventTime;
oldAttribute.getTimestamp().filter(t -> t >= 0 && finalEventTime < t).ifPresent(
    lastStateTime -> {
        throw new AssetProcessingException(
            EVENT_OUTDATED,
            "last asset state time: " + new Date(lastStateTime) + "/" + lastStateTime
                + ", event time: " + new Date(finalEventTime) + "/" + finalEventTime);
    }
);

I need this functionality and would prefer not to modify the main project.

Thanks in advance.

Hey,

Maybe you could use this API endpoint to insert a datapoint into the database? If I am understanding you correctly, you need to be able to store past datapoints for later consumption.

Unfortunately, I cannot find a way to update a datapoint through MQTT, but maybe you can write a custom implementation or modify some code using this method.

But maybe @rich would know a better way of handling this.

Thank you for the response, @panos!

Yes, my goal is to store historical data points for later consumption in the “Insights” section or for future analysis.

The API you mentioned is for consuming historical data, but I don’t see how I can use it to store data without modifying the main project.

In a previous post, @Rich mentioned that there is no HTTP Endpoint for storing historical data points: Add timeseries by Postman - #2 by Rich

The only issue I have is the “EVENT_OUTDATED” exception coded in the “AssetProcessingService.java” file that I mentioned above. Without that condition that restricts the storage of data prior to the last timestamp, I could store old data points without any issues.

Best regards!

Hey, I don’t really think that there is a way to store historical data of an attribute. What I believe you could do is use assetDatapointService.upsertValue to upsert the values. I saw that this is what we do for our testing purposes.

Best of luck!

Thank you again, @panos!

If you call assets.dispatch(new AttributeEvent(state.id, attributeName, attributeValue, timestamp)) from the Groovy Rule with an increasing timestamp greater than the last stored timestamp for that attribute, then historical data points can be saved in the database, I have verified this. However, if the timestamp is decreasing or there is a newer timestamp, then the “EVENT_OUTDATED” exception is triggered.

The upsertValue method you mentioned is executed when the event dispatched in the Groovy Rule is processed correctly, but in my case, it raises the exception.

Is it possible to call the upsertValue method directly from the Groovy Rule? If it is possible, it would solve the problem.

Best regards!

While I cannot test this directly, I was able to get the script to compile and run:

I just imported import org.openremote.manager.datapoint.AssetDatapointService and then wherever I needed to access the assetDatapointService I used this to grab the container:

AssetDatapointService assetDatapointService = container.getService(AssetDatapointService.class)

The only issue is that I am not exactly sure how to grab that container to then get its services. Best of luck.

But I am not exactly sure if this works. The rule did compile, I can see that on the logs, but I do not have the test environment right now to test it out properly and I would suggest exercising caution when doing this, because I am not sure of how safe or efficient this is.

I tried that yesterday, but without success. I don’t know how to obtain the container object in the Groovy Rule, and I’m not sure if it’s the right procedure. I’ll check the other Groovy Rules examples and documentation to find a solution.

Anyway, maybe @Rich can clarify if there is any reason to avoid storing older data than the lastest received timestamp for a specific attribute in the AssetProcessingService.

I’ve worked with other platforms before, and I’ve been able to store older historic data points. In the last one I’ve worked, for example, the MQTT API can have the Unix timestamp in the JSON message or not. If you provide the timestamp, the system takes it; if not, the system uses the arrival timestamp.

Thanks! Best regards.