/* * Copyright 2021, OpenRemote Inc. * * See the CONTRIBUTORS.txt file in the distribution for a * full listing of individual contributors. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package org.openremote.manager.mqtt; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.keycloak.KeycloakSecurityContext; import org.openremote.container.security.AuthContext; import org.openremote.manager.asset.AssetProcessingService; import org.openremote.manager.asset.AssetStorageService; import org.openremote.manager.event.ClientEventService; import org.openremote.model.Container; import org.openremote.model.asset.Asset; import org.openremote.model.asset.AssetFilter; import org.openremote.model.asset.UserAssetLink; import org.openremote.model.attribute.Attribute; import org.openremote.model.attribute.AttributeEvent; import org.openremote.model.security.User; import org.openremote.model.syslog.SyslogCategory; import org.openremote.model.util.ValueUtil; import org.openremote.model.value.MetaItemType; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Pattern; import static org.openremote.manager.mqtt.MQTTBrokerService.getConnectionIDString; import static org.openremote.manager.mqtt.UserAssetProvisioningMQTTHandler.PROVISIONING_USER_PREFIX; import static org.openremote.model.Constants.ASSET_ID_REGEXP; import static org.openremote.model.syslog.SyslogCategory.API; /** * This handler uses the {@link ClientEventService} to publish and subscribe to asset and attribute events; converting * subscription topics into {@link AssetFilter}s to ensure only the correct events are returned for the subscription. */ public class MultiAttributeMQTTHandler extends MQTTHandler { public static final int PRIORITY = Integer.MIN_VALUE + 5000; public static final String ATTRIBUTE_VALUES_WRITE_TOPIC = "jsonpayload"; protected static final String LOG_PREFIX = "MultiAttributeMqttHandler: "; private static final Logger LOG = SyslogCategory.getLogger(API, MultiAttributeMQTTHandler.class); // An authorisation cache for publishing protected final Cache> authorizationCache = CacheBuilder.newBuilder() .maximumSize(100000) .expireAfterWrite(300000, TimeUnit.MILLISECONDS) .build(); protected AssetStorageService assetStorageService; protected AssetProcessingService assetProcessingService; @Override public int getPriority() { // This handler is intended to be the final handler but this can obviously be overridden by another handler return PRIORITY; } @Override public void init(Container container) throws Exception { super.init(container); assetStorageService = container.getService(AssetStorageService.class); assetProcessingService = container.getService(AssetProcessingService.class); } @Override public boolean topicMatches(Topic topic) { return isAttributeValueMultipleWriteTopic(topic); } @Override protected Logger getLogger() { return LOG; } // TODO: improve authorisation performance // We make heavy use of authorisation caching as clients can hit this a lot and it is currently quite slow with DB calls @Override public boolean canPublish(RemotingConnection connection, KeycloakSecurityContext securityContext, Topic topic) { if (!isKeycloak) { LOG.fine(LOG_PREFIX + "Identity provider is not keycloak"); return false; } AuthContext authContext = getAuthContextFromSecurityContext(securityContext); if (authContext == null) { LOG.finer(LOG_PREFIX + "Anonymous publish not supported: topic=" + topic + ", connection=" + mqttBrokerService.connectionToString(connection)); return false; } if (!isAttributeValueMultipleWriteTopic(topic)) { LOG.fine(LOG_PREFIX + "Not the correct topic for this handler."); return false; } if (topic.getTokens().size() != 4 || !Pattern.matches(ASSET_ID_REGEXP, topicTokenIndexToString(topic, 3))) { LOG.finer(LOG_PREFIX + "Publish mulitple attribute values topic should be {realm}/{clientId}/" + ATTRIBUTE_VALUES_WRITE_TOPIC + "/{assetId}: topic=" + topic + ", connection=" + mqttBrokerService.connectionToString(connection)); return false; } String cacheKey = getConnectionIDString(connection); // Check cache ConcurrentHashSet act = authorizationCache.getIfPresent(cacheKey); if (act != null && act.contains(topic.getString())) { return true; } //DEBUG spit out list of Asset->User links found in the current Realm /* LOG.info("checking for existing User -> Asset links in realm " + securityContext.getRealm()); List userAssetLinks = assetStorageService.findUserAssetLinks(securityContext.getRealm(), (String) null, null); userAssetLinks.forEach(consumer -> LOG.info("Link Found: User ID " + consumer.getId().getUserId() + " (Username " + consumer.getUserFullName() + ") linked to Asset ID: " + consumer.getId().getAssetId())); */ // from "manager/src/main/java/org/openremote/manager/rules/RulesResourceImpl.java" // if (assetStorageService.isUserAsset(topicTokenIndexToString(topic, 3))) { ERROR: This will ALWAYS return False without a UserID! // -> NOTE: "findUserAssetLinks" is a DB query, so it will be slow. Good to cache the result. //TODO: What we need is access to the MQTT "subject", it matches the Username: "Client published to 'custom/ps-mqttx_1d0c990e/jsonpayload/4Lz1xJx3OT8OEyhODDcoCV': connection=/127.0.0.1:59008, clientID=ps-mqttx_1d0c990e, subject=service-account-ps-mqttx_1d0c990e" String username = User.SERVICE_ACCOUNT_PREFIX + (connection.getClientID().substring(0, 3).equalsIgnoreCase(PROVISIONING_USER_PREFIX) ? "" : PROVISIONING_USER_PREFIX) + connection.getClientID(); // assetStorageService.findUserAssetLinks searches for a "User ID"--NOT the Username. (A User ID is a UUID.) Thus, we cannot put the "Username" in the UserID and get a single result. List userAssetLinks = assetStorageService.findUserAssetLinks(securityContext.getRealm(), null, topicTokenIndexToString(topic, 3)); userAssetLinks.forEach(consumer -> LOG.info(LOG_PREFIX + "User ID " + consumer.getId().getUserId() + " (Username " + consumer.getUserFullName() + ") found under the current topic Asset ID (" + consumer.getId().getAssetId() + ")")); if (userAssetLinks.stream().noneMatch(userAssetLink -> userAssetLink.getUserFullName().equals(username))) { LOG.info(LOG_PREFIX + "Did not find any Asset matching Username in this realm!"); return false; } else { LOG.info(LOG_PREFIX + "Success finding an Asset matching the Username in this realm! Caching result..."); } // User serviceUser = identityProvider.getUserByUsername(securityContext.getRealm(), User.SERVICE_ACCOUNT_PREFIX + PROVISIONING_USER_PREFIX + connection.getClientID()); // PASS permissions check: Add this topic to cache. This will bypass the above check on a subsequent post. ConcurrentHashSet set; synchronized (authorizationCache) { act = authorizationCache.getIfPresent(cacheKey); if (act != null) { set = act; } else { set = new ConcurrentHashSet<>(); authorizationCache.put(cacheKey, set); } } set.add(topic.getString()); return true; } @Override public Set getPublishListenerTopics() { return Set.of( TOKEN_SINGLE_LEVEL_WILDCARD + "/" + TOKEN_SINGLE_LEVEL_WILDCARD + "/" + ATTRIBUTE_VALUES_WRITE_TOPIC + "/" + TOKEN_SINGLE_LEVEL_WILDCARD ); } @Override public void onPublish(RemotingConnection connection, Topic topic, ByteBuf body) { String payloadContent = body.toString(StandardCharsets.UTF_8); Asset asset = assetStorageService.find(topicTokenIndexToString(topic, 3)); // {AttributeName} is omitted for MultipleWrite if (asset == null) { LOG.info(LOG_PREFIX + "Asset ID " + topicTokenIndexToString(topic, 3) + " could not be found"); } else { JsonNode payload; try { payload = ValueUtil.parse(payloadContent, JsonNode.class).orElse(null); } catch (Exception e) { LOG.log(Level.INFO, LOG_PREFIX + "Failed to parse JSON string: " + payloadContent, e); return; } /* TODO: Attribute permissions need to be handled. Unfortunately, this method here does break the OpenRemote Asset cache to reduce database lookups--as the Attribute(s) being written to are not included in the MQTT "connection". if (!clientEventService.authorizeEventWrite(topicRealm(topic), authContext, buildAttributeEvent(topic.getTokens(), null))) { LOG.fine("Publish was not authorised for this user and topic: topic=" + topic + ", subject=" + authContext); return false; } */ // Attribute Meta data contains the permissions and other settings for the Attribute. This is much easier than checking Keycloak context // -> model/src/main/java/org/openremote/model/value/MetaItemType.java assert payload != null; Iterator> payloadIterator = payload.fields(); payloadIterator.forEachRemaining(payloadAttribute -> { Attribute assetAttribute = asset.getAttribute(payloadAttribute.getKey()).orElse(null); if (assetAttribute == null) { LOG.fine(LOG_PREFIX + "Attribute \"" + payloadAttribute.getKey() + "\" not found!"); } else { if (!assetAttribute.getMetaValue(MetaItemType.ACCESS_RESTRICTED_WRITE).orElse(false)) { LOG.fine(LOG_PREFIX + "Attribute \"" + payloadAttribute.getKey() + "\" does not have AccessRestrictedWrite permission"); } else { // Attribute exists + has the AccessRestrictedWrite flag. We can update it. AttributeEvent attributeEvent = new AttributeEvent(asset.getId(), payloadAttribute.getKey(), payloadAttribute.getValue()); LOG.fine(LOG_PREFIX + "Update \"" + payloadAttribute.getKey() + "\" with \"" + payloadAttribute.getValue().toString() + "\""); assetProcessingService.sendAttributeEvent(attributeEvent); } } }); } } @Override public boolean canSubscribe(RemotingConnection connection, KeycloakSecurityContext securityContext, Topic topic) { return false; } @Override public void onSubscribe(RemotingConnection connection, Topic topic) { } @Override public void onUnsubscribe(RemotingConnection connection, Topic topic) { } protected static boolean isAttributeValueMultipleWriteTopic(Topic topic) { return ATTRIBUTE_VALUES_WRITE_TOPIC.equalsIgnoreCase(topicTokenIndexToString(topic, 2)); } }