/*
* Copyright 2021, OpenRemote Inc.
*
* See the CONTRIBUTORS.txt file in the distribution for a
* full listing of individual contributors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
package org.openremote.manager.mqtt;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.keycloak.KeycloakSecurityContext;
import org.openremote.container.security.AuthContext;
import org.openremote.manager.asset.AssetProcessingService;
import org.openremote.manager.asset.AssetStorageService;
import org.openremote.manager.event.ClientEventService;
import org.openremote.model.Container;
import org.openremote.model.asset.Asset;
import org.openremote.model.asset.AssetFilter;
import org.openremote.model.asset.UserAssetLink;
import org.openremote.model.attribute.Attribute;
import org.openremote.model.attribute.AttributeEvent;
import org.openremote.model.security.User;
import org.openremote.model.syslog.SyslogCategory;
import org.openremote.model.util.ValueUtil;
import org.openremote.model.value.MetaItemType;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import static org.openremote.manager.mqtt.MQTTBrokerService.getConnectionIDString;
import static org.openremote.manager.mqtt.UserAssetProvisioningMQTTHandler.PROVISIONING_USER_PREFIX;
import static org.openremote.model.Constants.ASSET_ID_REGEXP;
import static org.openremote.model.syslog.SyslogCategory.API;
/**
* This handler uses the {@link ClientEventService} to publish and subscribe to asset and attribute events; converting
* subscription topics into {@link AssetFilter}s to ensure only the correct events are returned for the subscription.
*/
public class MultiAttributeMQTTHandler extends MQTTHandler {
public static final int PRIORITY = Integer.MIN_VALUE + 5000;
public static final String ATTRIBUTE_VALUES_WRITE_TOPIC = "jsonpayload";
protected static final String LOG_PREFIX = "MultiAttributeMqttHandler: ";
private static final Logger LOG = SyslogCategory.getLogger(API, MultiAttributeMQTTHandler.class);
// An authorisation cache for publishing
protected final Cache> authorizationCache = CacheBuilder.newBuilder()
.maximumSize(100000)
.expireAfterWrite(300000, TimeUnit.MILLISECONDS)
.build();
protected AssetStorageService assetStorageService;
protected AssetProcessingService assetProcessingService;
@Override
public int getPriority() {
// This handler is intended to be the final handler but this can obviously be overridden by another handler
return PRIORITY;
}
@Override
public void init(Container container) throws Exception {
super.init(container);
assetStorageService = container.getService(AssetStorageService.class);
assetProcessingService = container.getService(AssetProcessingService.class);
}
@Override
public boolean topicMatches(Topic topic) {
return isAttributeValueMultipleWriteTopic(topic);
}
@Override
protected Logger getLogger() {
return LOG;
}
// TODO: improve authorisation performance
// We make heavy use of authorisation caching as clients can hit this a lot and it is currently quite slow with DB calls
@Override
public boolean canPublish(RemotingConnection connection, KeycloakSecurityContext securityContext, Topic topic) {
if (!isKeycloak) {
LOG.fine(LOG_PREFIX + "Identity provider is not keycloak");
return false;
}
AuthContext authContext = getAuthContextFromSecurityContext(securityContext);
if (authContext == null) {
LOG.finer(LOG_PREFIX + "Anonymous publish not supported: topic=" + topic + ", connection=" + mqttBrokerService.connectionToString(connection));
return false;
}
if (!isAttributeValueMultipleWriteTopic(topic)) {
LOG.fine(LOG_PREFIX + "Not the correct topic for this handler.");
return false;
}
if (topic.getTokens().size() != 4 || !Pattern.matches(ASSET_ID_REGEXP, topicTokenIndexToString(topic, 3))) {
LOG.finer(LOG_PREFIX + "Publish mulitple attribute values topic should be {realm}/{clientId}/" + ATTRIBUTE_VALUES_WRITE_TOPIC + "/{assetId}: topic=" + topic + ", connection=" + mqttBrokerService.connectionToString(connection));
return false;
}
String cacheKey = getConnectionIDString(connection);
// Check cache
ConcurrentHashSet act = authorizationCache.getIfPresent(cacheKey);
if (act != null && act.contains(topic.getString())) {
return true;
}
//DEBUG spit out list of Asset->User links found in the current Realm
/* LOG.info("checking for existing User -> Asset links in realm " + securityContext.getRealm());
List userAssetLinks = assetStorageService.findUserAssetLinks(securityContext.getRealm(), (String) null, null);
userAssetLinks.forEach(consumer -> LOG.info("Link Found: User ID " + consumer.getId().getUserId() + " (Username " + consumer.getUserFullName() + ") linked to Asset ID: " + consumer.getId().getAssetId()));
*/
// from "manager/src/main/java/org/openremote/manager/rules/RulesResourceImpl.java"
// if (assetStorageService.isUserAsset(topicTokenIndexToString(topic, 3))) { ERROR: This will ALWAYS return False without a UserID!
// -> NOTE: "findUserAssetLinks" is a DB query, so it will be slow. Good to cache the result.
//TODO: What we need is access to the MQTT "subject", it matches the Username: "Client published to 'custom/ps-mqttx_1d0c990e/jsonpayload/4Lz1xJx3OT8OEyhODDcoCV': connection=/127.0.0.1:59008, clientID=ps-mqttx_1d0c990e, subject=service-account-ps-mqttx_1d0c990e"
String username = User.SERVICE_ACCOUNT_PREFIX + (connection.getClientID().substring(0, 3).equalsIgnoreCase(PROVISIONING_USER_PREFIX) ? "" : PROVISIONING_USER_PREFIX) + connection.getClientID();
// assetStorageService.findUserAssetLinks searches for a "User ID"--NOT the Username. (A User ID is a UUID.) Thus, we cannot put the "Username" in the UserID and get a single result.
List userAssetLinks = assetStorageService.findUserAssetLinks(securityContext.getRealm(), null, topicTokenIndexToString(topic, 3));
userAssetLinks.forEach(consumer -> LOG.info(LOG_PREFIX + "User ID " + consumer.getId().getUserId() + " (Username " + consumer.getUserFullName() + ") found under the current topic Asset ID (" + consumer.getId().getAssetId() + ")"));
if (userAssetLinks.stream().noneMatch(userAssetLink -> userAssetLink.getUserFullName().equals(username))) {
LOG.info(LOG_PREFIX + "Did not find any Asset matching Username in this realm!");
return false;
} else {
LOG.info(LOG_PREFIX + "Success finding an Asset matching the Username in this realm! Caching result...");
}
// User serviceUser = identityProvider.getUserByUsername(securityContext.getRealm(), User.SERVICE_ACCOUNT_PREFIX + PROVISIONING_USER_PREFIX + connection.getClientID());
// PASS permissions check: Add this topic to cache. This will bypass the above check on a subsequent post.
ConcurrentHashSet set;
synchronized (authorizationCache) {
act = authorizationCache.getIfPresent(cacheKey);
if (act != null) {
set = act;
} else {
set = new ConcurrentHashSet<>();
authorizationCache.put(cacheKey, set);
}
}
set.add(topic.getString());
return true;
}
@Override
public Set getPublishListenerTopics() {
return Set.of(
TOKEN_SINGLE_LEVEL_WILDCARD + "/" + TOKEN_SINGLE_LEVEL_WILDCARD + "/" + ATTRIBUTE_VALUES_WRITE_TOPIC + "/" + TOKEN_SINGLE_LEVEL_WILDCARD
);
}
@Override
public void onPublish(RemotingConnection connection, Topic topic, ByteBuf body) {
String payloadContent = body.toString(StandardCharsets.UTF_8);
Asset> asset = assetStorageService.find(topicTokenIndexToString(topic, 3)); // {AttributeName} is omitted for MultipleWrite
if (asset == null) {
LOG.info(LOG_PREFIX + "Asset ID " + topicTokenIndexToString(topic, 3) + " could not be found");
} else {
JsonNode payload;
try {
payload = ValueUtil.parse(payloadContent, JsonNode.class).orElse(null);
} catch (Exception e) {
LOG.log(Level.INFO, LOG_PREFIX + "Failed to parse JSON string: " + payloadContent, e);
return;
}
/* TODO: Attribute permissions need to be handled. Unfortunately, this method here does break the OpenRemote Asset
cache to reduce database lookups--as the Attribute(s) being written to are not included in the MQTT "connection".
if (!clientEventService.authorizeEventWrite(topicRealm(topic), authContext, buildAttributeEvent(topic.getTokens(), null))) {
LOG.fine("Publish was not authorised for this user and topic: topic=" + topic + ", subject=" + authContext);
return false;
}
*/
// Attribute Meta data contains the permissions and other settings for the Attribute. This is much easier than checking Keycloak context
// -> model/src/main/java/org/openremote/model/value/MetaItemType.java
assert payload != null;
Iterator> payloadIterator = payload.fields();
payloadIterator.forEachRemaining(payloadAttribute -> {
Attribute> assetAttribute = asset.getAttribute(payloadAttribute.getKey()).orElse(null);
if (assetAttribute == null) {
LOG.fine(LOG_PREFIX + "Attribute \"" + payloadAttribute.getKey() + "\" not found!");
} else {
if (!assetAttribute.getMetaValue(MetaItemType.ACCESS_RESTRICTED_WRITE).orElse(false)) {
LOG.fine(LOG_PREFIX + "Attribute \"" + payloadAttribute.getKey() + "\" does not have AccessRestrictedWrite permission");
} else {
// Attribute exists + has the AccessRestrictedWrite flag. We can update it.
AttributeEvent attributeEvent = new AttributeEvent(asset.getId(), payloadAttribute.getKey(), payloadAttribute.getValue());
LOG.fine(LOG_PREFIX + "Update \"" + payloadAttribute.getKey() + "\" with \"" + payloadAttribute.getValue().toString() + "\"");
assetProcessingService.sendAttributeEvent(attributeEvent);
}
}
});
}
}
@Override
public boolean canSubscribe(RemotingConnection connection, KeycloakSecurityContext securityContext, Topic topic) {
return false;
}
@Override
public void onSubscribe(RemotingConnection connection, Topic topic) {
}
@Override
public void onUnsubscribe(RemotingConnection connection, Topic topic) {
}
protected static boolean isAttributeValueMultipleWriteTopic(Topic topic) {
return ATTRIBUTE_VALUES_WRITE_TOPIC.equalsIgnoreCase(topicTokenIndexToString(topic, 2));
}
}