diff --git a/agents-audit/core/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java b/agents-audit/core/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java index 2fca6862732..ec42d89f153 100644 --- a/agents-audit/core/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java +++ b/agents-audit/core/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java @@ -433,6 +433,8 @@ private AuditHandler getProviderFromConfig(Properties props, String propPrefix, provider = createDestination("org.apache.ranger.audit.destination.SolrAuditDestination"); } else if (providerName.equalsIgnoreCase("elasticsearch")) { provider = createDestination("org.apache.ranger.audit.destination.ElasticSearchAuditDestination"); + } else if (providerName.equalsIgnoreCase("opensearch")) { + provider = createDestination("org.apache.ranger.audit.destination.OpenSearchAuditDestination"); } else if (providerName.equalsIgnoreCase("amazon_cloudwatch")) { provider = createDestination("org.apache.ranger.audit.destination.AmazonCloudWatchAuditDestination"); } else if (providerName.equalsIgnoreCase("kafka")) { diff --git a/agents-audit/dest-os/pom.xml b/agents-audit/dest-os/pom.xml new file mode 100644 index 00000000000..d8068437d8d --- /dev/null +++ b/agents-audit/dest-os/pom.xml @@ -0,0 +1,80 @@ + + + + 4.0.0 + + + org.apache.ranger + ranger + 3.0.0-SNAPSHOT + ../.. + + + ranger-audit-dest-os + jar + + Apache Ranger - Audit Destination OpenSearch + + + + com.fasterxml.jackson.core + jackson-databind + + + org.apache.httpcomponents + httpasyncclient + ${httpcomponents.httpasyncclient.version} + + + org.apache.httpcomponents + httpclient + ${httpcomponents.httpclient.version} + + + org.apache.httpcomponents + httpcore + ${httpcomponents.httpcore.version} + + + org.apache.httpcomponents + httpcore-nio + ${httpcomponents.httpcore.version} + + + org.apache.ranger + ranger-audit-core + ${project.version} + + + org.elasticsearch.client + elasticsearch-rest-client + ${elasticsearch.version} + + + org.slf4j + slf4j-api + ${slf4j-api.version} + + + org.junit.jupiter + junit-jupiter + ${junit.jupiter.version} + test + + + diff --git a/agents-audit/dest-os/src/main/java/org/apache/ranger/audit/destination/OpenSearchAuditDestination.java b/agents-audit/dest-os/src/main/java/org/apache/ranger/audit/destination/OpenSearchAuditDestination.java new file mode 100644 index 00000000000..4d1677b54b4 --- /dev/null +++ b/agents-audit/dest-os/src/main/java/org/apache/ranger/audit/destination/OpenSearchAuditDestination.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.destination; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.http.util.EntityUtils; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.model.AuthzAuditEvent; +import org.apache.ranger.audit.provider.MiscUtil; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.TimeZone; +import java.util.UUID; + +public class OpenSearchAuditDestination extends AuditDestination { + private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAuditDestination.class); + + public static final String CONFIG_PREFIX = "ranger.audit.opensearch"; + public static final String CONFIG_URLS = "urls"; + public static final String CONFIG_PORT = "port"; + public static final String CONFIG_USER = "user"; + public static final String CONFIG_PASSWORD = "password"; + public static final String CONFIG_PROTOCOL = "protocol"; + public static final String CONFIG_INDEX = "index"; + public static final String DEFAULT_INDEX = "ranger_audits"; + + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final ThreadLocal DATE_FORMAT = ThreadLocal.withInitial(() -> { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + sdf.setTimeZone(TimeZone.getTimeZone("UTC")); + return sdf; + }); + + private volatile RestClient client; + private String index; + private String user; + private String password; + private String protocol; + private String urls; + private int port; + + public OpenSearchAuditDestination() { + propPrefix = CONFIG_PREFIX; + } + + @Override + public void init(Properties props, String propPrefix) { + super.init(props, propPrefix); + + this.urls = MiscUtil.getStringProperty(props, propPrefix + "." + CONFIG_URLS, "localhost"); + this.port = MiscUtil.getIntProperty(props, propPrefix + "." + CONFIG_PORT, 9200); + this.protocol = MiscUtil.getStringProperty(props, propPrefix + "." + CONFIG_PROTOCOL, "http"); + this.user = MiscUtil.getStringProperty(props, propPrefix + "." + CONFIG_USER, ""); + this.password = MiscUtil.getStringProperty(props, propPrefix + "." + CONFIG_PASSWORD, ""); + this.index = MiscUtil.getStringProperty(props, propPrefix + "." + CONFIG_INDEX, DEFAULT_INDEX); + + LOG.info("OpenSearchAuditDestination.init(): urls={}, port={}, index={}", urls, port, index); + + getClient(); + } + + @Override + public void stop() { + logStatus(); + + if (client != null) { + try { + client.close(); + } catch (Exception e) { + LOG.error("Error closing OpenSearch client", e); + } + } + } + + @Override + public void flush() { + } + + @Override + public boolean log(Collection events) { + if (events == null || events.isEmpty()) { + return true; + } + + RestClient currentClient = getClient(); + + if (currentClient == null) { + LOG.error("OpenSearch client is null. Cannot write audit events."); + return false; + } + + try { + StringBuilder bulk = new StringBuilder(); + + for (AuditEventBase event : events) { + AuthzAuditEvent auditEvent = (AuthzAuditEvent) event; + Map doc = toDoc(auditEvent); + String id = (String) doc.get("id"); + + if (StringUtils.isBlank(id)) { + id = UUID.randomUUID().toString(); + doc.put("id", id); + } + + Map indexProps = new HashMap<>(); + indexProps.put("_index", index); + indexProps.put("_id", id); + + bulk.append(MAPPER.writeValueAsString(Map.of("index", indexProps))).append('\n'); + bulk.append(MAPPER.writeValueAsString(doc)).append('\n'); + } + + Request request = new Request("POST", "/_bulk"); + request.setEntity(new NStringEntity(bulk.toString(), ContentType.create("application/x-ndjson", StandardCharsets.UTF_8))); + + Response response = currentClient.performRequest(request); + + if (response.getStatusLine().getStatusCode() >= 400) { + LOG.error("OpenSearch bulk request failed: HTTP {}", response.getStatusLine().getStatusCode()); + return false; + } + + String responseBody = EntityUtils.toString(response.getEntity()); + @SuppressWarnings("unchecked") + Map responseMap = MAPPER.readValue(responseBody, Map.class); + + if (Boolean.TRUE.equals(responseMap.get("errors"))) { + LOG.error("OpenSearch bulk response contains item-level errors"); + return false; + } + + addSuccessCount(events.size()); + return true; + } catch (Exception e) { + addFailedCount(events.size()); + LOG.error("Failed to write audit events to OpenSearch", e); + return false; + } + } + + public boolean isAsync() { + return true; + } + + synchronized RestClient getClient() { + if (client == null) { + if (StringUtils.isBlank(urls) || "NONE".equalsIgnoreCase(urls)) { + LOG.error("OpenSearch URLs not configured"); + return null; + } + + HttpHost[] hosts = Arrays.stream(urls.split(",")).map(String::trim).filter(h -> !h.isEmpty()).map(h -> new HttpHost(h, port, protocol)).toArray(HttpHost[]::new); + RestClientBuilder builder = RestClient.builder(hosts); + + if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password) && !"NONE".equalsIgnoreCase(user) && !"NONE".equalsIgnoreCase(password)) { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); + builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } + + client = builder.build(); + } + + return client; + } + + Map toDoc(AuthzAuditEvent event) { + Map doc = new HashMap<>(); + + doc.put("id", event.getEventId()); + doc.put("access", event.getAccessType()); + doc.put("enforcer", event.getAclEnforcer()); + doc.put("agent", event.getAgentId()); + doc.put("repo", event.getRepositoryName()); + doc.put("sess", event.getSessionId()); + doc.put("reqUser", event.getUser()); + doc.put("reqData", event.getRequestData()); + doc.put("resource", event.getResourcePath()); + doc.put("cliIP", event.getClientIP()); + doc.put("logType", event.getLogType()); + doc.put("result", event.getAccessResult()); + doc.put("policy", event.getPolicyId()); + doc.put("repoType", event.getRepositoryType()); + doc.put("resType", event.getResourceType()); + doc.put("reason", event.getResultReason()); + doc.put("action", event.getAction()); + doc.put("evtTime", formatDate(event.getEventTime())); + doc.put("seq_num", event.getSeqNum()); + doc.put("event_count", event.getEventCount()); + doc.put("event_dur_ms", event.getEventDurationMS()); + doc.put("tags", event.getTags()); + doc.put("datasets", event.getDatasets()); + doc.put("projects", event.getProjects()); + doc.put("cluster", event.getClusterName()); + doc.put("zoneName", event.getZoneName()); + doc.put("agentHost", event.getAgentHostname()); + doc.put("policyVersion", event.getPolicyVersion()); + + return doc; + } + + private static String formatDate(Date date) { + return date != null ? DATE_FORMAT.get().format(date) : null; + } +} diff --git a/agents-audit/dest-os/src/test/java/org/apache/ranger/audit/destination/TestOpenSearchAuditDestination.java b/agents-audit/dest-os/src/test/java/org/apache/ranger/audit/destination/TestOpenSearchAuditDestination.java new file mode 100644 index 00000000000..e6b84ee67b6 --- /dev/null +++ b/agents-audit/dest-os/src/test/java/org/apache/ranger/audit/destination/TestOpenSearchAuditDestination.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.destination; + +import org.apache.ranger.audit.model.AuthzAuditEvent; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Date; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TestOpenSearchAuditDestination { + private OpenSearchAuditDestination destination; + + @BeforeEach + void setUp() { + destination = new OpenSearchAuditDestination(); + } + + @Test + void toDoc_mapsAllFieldsCorrectly() { + AuthzAuditEvent event = new AuthzAuditEvent(); + event.setEventId("test-event-001"); + event.setAccessType("read"); + event.setAclEnforcer("ranger-acl"); + event.setAgentId("hdfs-agent"); + event.setRepositoryName("dev_hdfs"); + event.setSessionId("sess-123"); + event.setUser("testuser"); + event.setRequestData("/tmp/testfile"); + event.setResourcePath("/data/warehouse"); + event.setClientIP("192.168.1.10"); + event.setLogType("RangerAudit"); + event.setAccessResult((short) 1); + event.setPolicyId(42L); + event.setRepositoryType(1); + event.setResourceType("path"); + event.setResultReason("Allowed by policy"); + event.setAction("read"); + event.setEventTime(new Date(1700000000000L)); + event.setSeqNum(10L); + event.setEventCount(1L); + event.setEventDurationMS(50L); + event.setClusterName("test-cluster"); + event.setZoneName("zone1"); + event.setAgentHostname("host1.example.com"); + event.setPolicyVersion(5L); + + Map doc = destination.toDoc(event); + + assertEquals("test-event-001", doc.get("id")); + assertEquals("read", doc.get("access")); + assertEquals("ranger-acl", doc.get("enforcer")); + assertEquals("hdfs-agent", doc.get("agent")); + assertEquals("dev_hdfs", doc.get("repo")); + assertEquals("sess-123", doc.get("sess")); + assertEquals("testuser", doc.get("reqUser")); + assertEquals("/tmp/testfile", doc.get("reqData")); + assertEquals("/data/warehouse", doc.get("resource")); + assertEquals("192.168.1.10", doc.get("cliIP")); + assertEquals("RangerAudit", doc.get("logType")); + assertEquals((short) 1, doc.get("result")); + assertEquals(42L, doc.get("policy")); + assertEquals(1, doc.get("repoType")); + assertEquals("path", doc.get("resType")); + assertEquals("Allowed by policy", doc.get("reason")); + assertEquals("read", doc.get("action")); + assertNotNull(doc.get("evtTime")); + assertTrue(doc.get("evtTime").toString().contains("2023-11-14")); + assertEquals(10L, doc.get("seq_num")); + assertEquals(1L, doc.get("event_count")); + assertEquals(50L, doc.get("event_dur_ms")); + assertEquals("test-cluster", doc.get("cluster")); + assertEquals("zone1", doc.get("zoneName")); + assertEquals("host1.example.com", doc.get("agentHost")); + assertEquals(5L, doc.get("policyVersion")); + } + + @Test + void toDoc_nullEventId_remainsNull() { + AuthzAuditEvent event = new AuthzAuditEvent(); + event.setUser("testuser"); + + Map doc = destination.toDoc(event); + + assertNull(doc.get("id")); + } + + @Test + void toDoc_nullEventTime_formatsAsNull() { + AuthzAuditEvent event = new AuthzAuditEvent(); + event.setEventId("evt-1"); + event.setEventTime(null); + + Map doc = destination.toDoc(event); + + assertNull(doc.get("evtTime")); + } + + @Test + void getClient_noneUrls_returnsNull() { + java.util.Properties props = new java.util.Properties(); + props.setProperty(OpenSearchAuditDestination.CONFIG_PREFIX + ".urls", "NONE"); + + destination.init(props, OpenSearchAuditDestination.CONFIG_PREFIX); + + assertNull(destination.getClient()); + } + + @Test + void log_nullClient_returnsFalse() { + java.util.Properties props = new java.util.Properties(); + props.setProperty(OpenSearchAuditDestination.CONFIG_PREFIX + ".urls", "NONE"); + destination.init(props, OpenSearchAuditDestination.CONFIG_PREFIX); + + AuthzAuditEvent event = new AuthzAuditEvent(); + event.setEventId("test-1"); + event.setUser("user1"); + + java.util.Collection events = java.util.List.of(event); + boolean result = destination.log(events); + + assertFalse(result); + } + + @Test + void log_emptyEvents_returnsTrue() { + java.util.Collection events = java.util.Collections.emptyList(); + boolean result = destination.log(events); + + assertTrue(result); + } + + @Test + void log_nullEvents_returnsTrue() { + boolean result = destination.log((java.util.Collection) null); + + assertTrue(result); + } + + @Test + void configConstants_matchExpectedValues() { + assertEquals("ranger.audit.opensearch", OpenSearchAuditDestination.CONFIG_PREFIX); + assertEquals("urls", OpenSearchAuditDestination.CONFIG_URLS); + assertEquals("port", OpenSearchAuditDestination.CONFIG_PORT); + assertEquals("user", OpenSearchAuditDestination.CONFIG_USER); + assertEquals("password", OpenSearchAuditDestination.CONFIG_PASSWORD); + assertEquals("protocol", OpenSearchAuditDestination.CONFIG_PROTOCOL); + assertEquals("index", OpenSearchAuditDestination.CONFIG_INDEX); + assertEquals("ranger_audits", OpenSearchAuditDestination.DEFAULT_INDEX); + } +} diff --git a/agents-audit/pom.xml b/agents-audit/pom.xml index a25edde9257..173f4f80b41 100644 --- a/agents-audit/pom.xml +++ b/agents-audit/pom.xml @@ -37,6 +37,7 @@ dest-hdfs dest-kafka dest-log4j + dest-os dest-solr orc-util diff --git a/audit-server/audit-dispatcher/dispatcher-app/pom.xml b/audit-server/audit-dispatcher/dispatcher-app/pom.xml index d18d487e388..161256d3461 100644 --- a/audit-server/audit-dispatcher/dispatcher-app/pom.xml +++ b/audit-server/audit-dispatcher/dispatcher-app/pom.xml @@ -85,6 +85,12 @@ ${project.version} provided + + org.apache.ranger + audit-dispatcher-opensearch + ${project.version} + provided + org.apache.ranger audit-dispatcher-solr diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/pom.xml b/audit-server/audit-dispatcher/dispatcher-opensearch/pom.xml new file mode 100644 index 00000000000..715c4dafb1c --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-opensearch/pom.xml @@ -0,0 +1,210 @@ + + + + 4.0.0 + + + org.apache.ranger + ranger + 3.0.0-SNAPSHOT + ../../.. + + + audit-dispatcher-opensearch + jar + Ranger Audit Dispatcher OpenSearch + Kafka dispatcher service for indexing audits into OpenSearch/ElasticSearch + + + UTF-8 + + + + + + com.fasterxml.jackson.core + jackson-databind + ${fasterxml.jackson.version} + + + + org.apache.commons + commons-lang3 + ${commons.lang3.version} + + + + + org.apache.httpcomponents + httpasyncclient + ${httpcomponents.httpasyncclient.version} + + + commons-logging + * + + + + + org.apache.httpcomponents + httpclient + ${httpcomponents.httpclient.version} + + + commons-logging + * + + + + + org.apache.httpcomponents + httpcore + ${httpcomponents.httpcore.version} + + + org.apache.httpcomponents + httpcore-nio + ${httpcomponents.httpcore.version} + + + + + org.apache.ranger + ranger-audit-dest-es + ${project.version} + + + org.apache.hadoop + hadoop-client-api + + + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + provided + + + log4j + * + + + org.slf4j + * + + + + + org.apache.ranger + ranger-audit-core + ${project.version} + provided + + + org.apache.hadoop + hadoop-client-api + + + + + org.apache.ranger + ranger-audit-dispatcher-common + ${project.version} + provided + + + org.apache.hadoop + hadoop-client-api + + + + + org.apache.ranger + ranger-audit-server-common + ${project.version} + provided + + + org.slf4j + slf4j-api + ${slf4j.version} + provided + + + + + org.junit.jupiter + junit-jupiter + ${junit.jupiter.version} + test + + + org.mockito + mockito-core + ${mockito.version} + test + + + org.mockito + mockito-junit-jupiter + ${mockito.version} + test + + + + + audit-dispatcher-opensearch-${project.version} + + + true + src/main/resources + + + + + org.apache.maven.plugins + maven-pmd-plugin + + + ${project.parent.basedir}/dev-support/ranger-pmd-ruleset.xml + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + + copy-dependencies + + package + + ${project.build.directory}/lib + runtime + + + + + + + diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/AuditEventOpenSearchDocMapper.java b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/AuditEventOpenSearchDocMapper.java new file mode 100644 index 00000000000..1a3cc76e7cb --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/AuditEventOpenSearchDocMapper.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.dispatcher; + +import org.apache.ranger.audit.model.AuthzAuditEvent; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.TimeZone; + +public final class AuditEventOpenSearchDocMapper { + private static final ThreadLocal DATE_FORMAT = ThreadLocal.withInitial(() -> { + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + format.setTimeZone(TimeZone.getTimeZone("UTC")); + return format; + }); + + private AuditEventOpenSearchDocMapper() { + } + + public static Map toDoc(final AuthzAuditEvent auditEvent) { + Map doc = new HashMap<>(); + + doc.put("id", auditEvent.getEventId()); + doc.put("access", auditEvent.getAccessType()); + doc.put("enforcer", auditEvent.getAclEnforcer()); + doc.put("agent", auditEvent.getAgentId()); + doc.put("repo", auditEvent.getRepositoryName()); + doc.put("sess", auditEvent.getSessionId()); + doc.put("reqUser", auditEvent.getUser()); + doc.put("reqData", auditEvent.getRequestData()); + doc.put("resource", auditEvent.getResourcePath()); + doc.put("cliIP", auditEvent.getClientIP()); + doc.put("cliType", auditEvent.getClientType()); + doc.put("logType", auditEvent.getLogType()); + doc.put("result", auditEvent.getAccessResult()); + doc.put("policy", auditEvent.getPolicyId()); + doc.put("repoType", auditEvent.getRepositoryType()); + doc.put("resType", auditEvent.getResourceType()); + doc.put("reason", auditEvent.getResultReason()); + doc.put("action", auditEvent.getAction()); + + Date eventTime = auditEvent.getEventTime(); + doc.put("evtTime", eventTime != null ? DATE_FORMAT.get().format(eventTime) : null); + + doc.put("seq_num", auditEvent.getSeqNum()); + doc.put("event_count", auditEvent.getEventCount()); + doc.put("event_dur_ms", auditEvent.getEventDurationMS()); + doc.put("tags", auditEvent.getTags()); + doc.put("datasets", auditEvent.getDatasets()); + doc.put("projects", auditEvent.getProjects()); + doc.put("datasetIds", auditEvent.getDatasetIds()); + doc.put("cluster", auditEvent.getClusterName()); + doc.put("zoneName", auditEvent.getZoneName()); + doc.put("agentHost", auditEvent.getAgentHostname()); + doc.put("policyVersion", auditEvent.getPolicyVersion()); + doc.put("additionalInfo", auditEvent.getAdditionalInfo()); + + return doc; + } +} diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/OpenSearchDispatcherManager.java b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/OpenSearchDispatcherManager.java new file mode 100644 index 00000000000..308e0959faf --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/OpenSearchDispatcherManager.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.dispatcher; + +import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcher; +import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcherTracker; +import org.apache.ranger.audit.dispatcher.kafka.AuditOpenSearchDispatcher; +import org.apache.ranger.audit.provider.MiscUtil; +import org.apache.ranger.audit.server.AuditServerConstants; +import org.apache.ranger.audit.utils.AuditServerLogFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public final class OpenSearchDispatcherManager { + private static final Logger LOG = LoggerFactory.getLogger(OpenSearchDispatcherManager.class); + private static final String CONFIG_DISPATCHER_TYPE = AuditServerConstants.PROP_DISPATCHER_TYPE; + private static final String TYPE_OPENSEARCH = "opensearch"; + private static final String ES_DEST_PROP = "xasecure.audit.destination.elasticsearch"; + private static final int MAX_INIT_ATTEMPTS = 5; + private static final long INIT_RETRY_MS = 5000L; + private static final long SHUTDOWN_WAIT_MS = 10000L; + + private final AuditDispatcherTracker tracker = AuditDispatcherTracker.getInstance(); + private AuditDispatcher dispatcher; + private Thread dispatcherThread; + + public void init(final Properties props) { + LOG.info("==> OpenSearchDispatcherManager.init()"); + + String dispatcherType = System.getProperty(CONFIG_DISPATCHER_TYPE); + if (dispatcherType != null && !dispatcherType.equalsIgnoreCase(TYPE_OPENSEARCH)) { + LOG.info("Skipping OpenSearchDispatcherManager initialization since dispatcher type is {}", dispatcherType); + return; + } + + try { + if (props == null) { + LOG.error("Configuration properties are null"); + throw new RuntimeException("Failed to load configuration"); + } + + boolean isEnabled = MiscUtil.getBooleanProperty(props, ES_DEST_PROP, false); + if (!isEnabled) { + String clsName = MiscUtil.getStringProperty(props, AuditServerConstants.PROP_DISPATCHER_CLASS); + if (clsName != null && clsName.contains("AuditOpenSearchDispatcher")) { + isEnabled = true; + } + } + + if (!isEnabled) { + LOG.warn("OpenSearch destination is disabled ({}=false). No dispatchers will be created.", ES_DEST_PROP); + return; + } + + initializeDispatcher(props, AuditServerConstants.PROP_DISPATCHER_PREFIX); + + if (dispatcher == null) { + throw new RuntimeException("No OpenSearch dispatcher was created. Verify that " + ES_DEST_PROP + "=true and classes are configured correctly."); + } else { + LOG.info("Created OpenSearch dispatcher"); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + LOG.info("JVM shutdown detected, stopping OpenSearchDispatcherManager"); + shutdown(); + }, "OpenSearchDispatcher-ShutdownHook")); + + startDispatcher(); + } + } catch (Exception e) { + LOG.error("Failed to initialize OpenSearchDispatcherManager", e); + throw new RuntimeException("Failed to initialize OpenSearchDispatcherManager", e); + } + + LOG.info("<== OpenSearchDispatcherManager.init()"); + } + + public void shutdown() { + LOG.info("==> OpenSearchDispatcherManager.shutdown()"); + + if (dispatcher != null) { + try { + LOG.info("Shutting down dispatcher: {}", dispatcher.getClass().getSimpleName()); + dispatcher.shutdown(); + LOG.info("Dispatcher shutdown completed: {}", dispatcher.getClass().getSimpleName()); + } catch (Exception e) { + LOG.error("Error shutting down dispatcher: {}", dispatcher.getClass().getSimpleName(), e); + } + } + + if (dispatcherThread != null && dispatcherThread.isAlive()) { + try { + LOG.info("Waiting for thread to terminate: {}", dispatcherThread.getName()); + dispatcherThread.join(SHUTDOWN_WAIT_MS); + if (dispatcherThread.isAlive()) { + LOG.warn("Thread did not terminate within {}ms: {}", SHUTDOWN_WAIT_MS, dispatcherThread.getName()); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for thread to terminate: {}", dispatcherThread.getName(), e); + Thread.currentThread().interrupt(); + } + } + + dispatcher = null; + dispatcherThread = null; + tracker.clearActiveDispatcher(TYPE_OPENSEARCH); + + LOG.info("<== OpenSearchDispatcherManager.shutdown() - OpenSearch dispatcher stopped"); + } + + private void initializeDispatcher(final Properties props, final String propPrefix) { + LOG.info("==> OpenSearchDispatcherManager.initializeDispatcher()"); + + String clsStr = MiscUtil.getStringProperty(props, AuditServerConstants.PROP_DISPATCHER_CLASS, AuditOpenSearchDispatcher.class.getName()); + String className = clsStr.split(",")[0].trim(); + + if (className.isEmpty()) { + LOG.error("Dispatcher class name is empty"); + return; + } + + long retryDelay = INIT_RETRY_MS; + + for (int attempt = 1; attempt <= MAX_INIT_ATTEMPTS; attempt++) { + try { + Class cls = Class.forName(className); + dispatcher = (AuditDispatcher) cls.getConstructor(Properties.class, String.class).newInstance(props, propPrefix); + tracker.addActiveDispatcher(TYPE_OPENSEARCH, dispatcher); + LOG.info("Successfully initialized dispatcher class: {}", cls.getName()); + break; + } catch (ClassNotFoundException e) { + LOG.error("Dispatcher class not found: {}. Ensure the class is on the classpath.", className, e); + break; + } catch (Exception e) { + if (attempt < MAX_INIT_ATTEMPTS) { + LOG.warn("Dispatcher init attempt {}/{} failed, retrying in {}ms...", attempt, MAX_INIT_ATTEMPTS, retryDelay, e); + try { + Thread.sleep(retryDelay); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + retryDelay *= 2; + } else { + LOG.error("Error initializing dispatcher class after {} attempts: {}", MAX_INIT_ATTEMPTS, className, e); + } + } + } + + LOG.info("<== OpenSearchDispatcherManager.initializeDispatcher()"); + } + + private void startDispatcher() { + LOG.info("==> OpenSearchDispatcherManager.startDispatcher()"); + + logStartupBanner(); + + if (dispatcher != null) { + try { + String name = dispatcher.getClass().getSimpleName(); + dispatcherThread = new Thread(dispatcher, name); + dispatcherThread.setDaemon(true); + dispatcherThread.start(); + LOG.info("Started {} thread [Thread-ID: {}, Thread-Name: '{}']", name, dispatcherThread.getId(), dispatcherThread.getName()); + } catch (Exception e) { + LOG.error("Error starting dispatcher: {}", dispatcher.getClass().getSimpleName(), e); + } + } + + LOG.info("<== OpenSearchDispatcherManager.startDispatcher()"); + } + + private void logStartupBanner() { + LOG.info("########## OPENSEARCH DISPATCHER SERVICE STARTUP ##########"); + + if (dispatcher == null) { + LOG.warn("WARNING: No OpenSearch dispatchers are enabled!"); + LOG.warn("Verify: {}=true in configuration", ES_DEST_PROP); + } else { + AuditServerLogFormatter.LogBuilder builder = AuditServerLogFormatter.builder("OpenSearch Dispatcher Status"); + String type = dispatcher.getClass().getSimpleName(); + builder.add(type, "ENABLED"); + builder.add("Topic", dispatcher.getTopicName()); + builder.logInfo(LOG); + LOG.info("Starting OpenSearch dispatcher thread..."); + } + + LOG.info("##########################################################"); + } +} diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/kafka/AuditOpenSearchDispatcher.java b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/kafka/AuditOpenSearchDispatcher.java new file mode 100644 index 00000000000..9c2e3387520 --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/kafka/AuditOpenSearchDispatcher.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.dispatcher.kafka; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHost; +import org.apache.http.HttpStatus; +import org.apache.http.auth.AuthSchemeProvider; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.config.AuthSchemes; +import org.apache.http.config.Lookup; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.auth.SPNegoSchemeFactory; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.http.util.EntityUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.ranger.audit.dispatcher.AuditEventOpenSearchDocMapper; +import org.apache.ranger.audit.model.AuthzAuditEvent; +import org.apache.ranger.audit.provider.MiscUtil; +import org.apache.ranger.audit.server.AuditServerConstants; +import org.apache.ranger.audit.utils.AuditServerLogFormatter; +import org.apache.ranger.authorization.credutils.CredentialsProviderUtil; +import org.apache.ranger.authorization.credutils.kerberos.KerberosCredentialsProvider; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.stream.Collectors; + +public class AuditOpenSearchDispatcher extends AuditDispatcherBase { + private static final Logger LOG = LoggerFactory.getLogger(AuditOpenSearchDispatcher.class); + private static final String DEFAULT_GROUP = "ranger_audit_opensearch_dispatcher_group"; + private static final String DEFAULT_INDEX = "ranger_audits"; + private static final long RETRY_SLEEP_MS = 5000L; + private static final int DEFAULT_PORT = 9200; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final TypeReference> MAP_TYPE = new TypeReference>() { }; + + private RestClient openSearchClient; + private String openSearchIndex; + + public AuditOpenSearchDispatcher(final Properties props, final String propPrefix) throws Exception { + super(props, propPrefix, DEFAULT_GROUP); + init(props, propPrefix); + } + + @Override + protected final String getDispatcherName() { + return "OPENSEARCH"; + } + + @Override + protected final DispatcherWorker createDispatcherWorker(final String workerId, final List assignedPartitions) { + return new OpenSearchDispatcherWorker(workerId, assignedPartitions); + } + + @Override + protected final void shutdownDestination() { + if (openSearchClient != null) { + try { + openSearchClient.close(); + } catch (Exception e) { + LOG.error("Error shutting down OpenSearch REST client", e); + } + } + } + + private void init(final Properties props, final String propPrefix) throws Exception { + LOG.info("==> AuditOpenSearchDispatcher.init()"); + + String pfx = propPrefix + "."; + + this.openSearchIndex = MiscUtil.getStringProperty(props, pfx + "index", DEFAULT_INDEX); + this.openSearchClient = createOpenSearchClient(props, pfx); + + this.dispatcherThreadCount = MiscUtil.getIntProperty(props, pfx + AuditServerConstants.PROP_DISPATCHER_THREAD_COUNT, 1); + this.offsetCommitStrategy = MiscUtil.getStringProperty(props, pfx + AuditServerConstants.PROP_DISPATCHER_OFFSET_COMMIT_STRATEGY, AuditServerConstants.DEFAULT_OFFSET_COMMIT_STRATEGY); + this.offsetCommitInterval = MiscUtil.getLongProperty(props, pfx + AuditServerConstants.PROP_DISPATCHER_OFFSET_COMMIT_INTERVAL, AuditServerConstants.DEFAULT_OFFSET_COMMIT_INTERVAL_MS); + + AuditServerLogFormatter.builder("AuditOpenSearchDispatcher Configuration") + .add("Index", openSearchIndex) + .add("Thread Count", dispatcherThreadCount) + .add("Commit Strategy", offsetCommitStrategy) + .add("Commit Interval (ms)", offsetCommitInterval + " (manual mode only)") + .logInfo(LOG); + + LOG.info("<== AuditOpenSearchDispatcher.init()"); + } + + private void processMessageBatch(final Collection audits) throws Exception { + if (audits == null || audits.isEmpty()) { + throw new Exception("Failure in sending audits into OpenSearch"); + } + + StringBuilder bulkBody = new StringBuilder(); + + for (String audit : audits) { + AuthzAuditEvent auditEvent = MiscUtil.fromJson(audit, AuthzAuditEvent.class); + String id = auditEvent.getEventId(); + Map doc = AuditEventOpenSearchDocMapper.toDoc(auditEvent); + + if (id == null || id.trim().isEmpty()) { + id = UUID.randomUUID().toString(); + doc.put("id", id); + } + + Map indexProperties = new HashMap<>(); + indexProperties.put("_index", openSearchIndex); + indexProperties.put("_id", id); + + Map indexMeta = Collections.singletonMap("index", indexProperties); + bulkBody.append(OBJECT_MAPPER.writeValueAsString(indexMeta)).append('\n') + .append(OBJECT_MAPPER.writeValueAsString(doc)).append('\n'); + } + + Request request = new Request("POST", "/_bulk"); + request.setEntity(new NStringEntity(bulkBody.toString(), ContentType.create("application/x-ndjson", StandardCharsets.UTF_8))); + + Response response = openSearchClient.performRequest(request); + int status = response.getStatusLine().getStatusCode(); + + if (status >= HttpStatus.SC_BAD_REQUEST) { + throw new Exception("OpenSearch bulk request failed with HTTP " + status); + } + + String responseBody = response.getEntity() != null ? EntityUtils.toString(response.getEntity()) : "{}"; + Map responseMap = OBJECT_MAPPER.readValue(responseBody, MAP_TYPE); + Object hasErrors = responseMap.get("errors"); + + if (Boolean.TRUE.equals(hasErrors)) { + throw new Exception("OpenSearch bulk request returned item errors: " + responseBody); + } + } + + private RestClient createOpenSearchClient(final Properties props, final String pfx) { + String protocol = MiscUtil.getStringProperty(props, pfx + "protocol", "http"); + String urls = MiscUtil.getStringProperty(props, pfx + "urls", "localhost"); + int port = MiscUtil.getIntProperty(props, pfx + "port", DEFAULT_PORT); + String user = MiscUtil.getStringProperty(props, pfx + "user", ""); + String password = MiscUtil.getStringProperty(props, pfx + "password", ""); + HttpHost[] hosts = MiscUtil.toArray(urls, ",").stream().map(h -> new HttpHost(h, port, protocol)).toArray(HttpHost[]::new); + + LOG.info("Connecting to OpenSearch: {}://{}:{}/{}", protocol, urls, port, openSearchIndex); + + RestClientBuilder builder = RestClient.builder(hosts); + + if (isCredentialConfigured(user) && isCredentialConfigured(password)) { + if (password.contains("keytab") && new File(password).exists()) { + KerberosCredentialsProvider creds = CredentialsProviderUtil.getKerberosCredentials(user, password); + Lookup authRegistry = RegistryBuilder.create() + .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory()) + .build(); + builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder + .setDefaultCredentialsProvider(creds) + .setDefaultAuthSchemeRegistry(authRegistry)); + LOG.info("OpenSearch client configured with Kerberos credentials for user: {}", user); + } else { + CredentialsProvider creds = new BasicCredentialsProvider(); + creds.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); + builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(creds)); + LOG.info("OpenSearch client configured with basic auth for user: {}", user); + } + } + + return builder.build(); + } + + private boolean isCredentialConfigured(final String value) { + return StringUtils.isNotBlank(value) && !"NONE".equalsIgnoreCase(value.trim()); + } + + private class OpenSearchDispatcherWorker extends DispatcherWorker { + OpenSearchDispatcherWorker(final String workerId, final List assignedPartitions) { + super(workerId, assignedPartitions); + } + + @Override + protected void processRecordBatch(final ConsumerRecords records) { + for (TopicPartition tp : records.partitions()) { + List> tpRecords = records.records(tp); + + if (tpRecords.isEmpty()) { + continue; + } + + try { + List auditBatch = tpRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList()); + processMessageBatch(auditBatch); + + ConsumerRecord last = tpRecords.get(tpRecords.size() - 1); + pendingOffsets.put(tp, new OffsetAndMetadata(last.offset() + 1)); + messagesProcessedSinceLastCommit.addAndGet(tpRecords.size()); + } catch (Exception e) { + LOG.error("Error processing batch in worker '{}', partition={}, batch size: {}", workerId, tp, tpRecords.size(), e); + + ConsumerRecord first = tpRecords.get(0); + pendingOffsets.put(tp, new OffsetAndMetadata(first.offset())); + + try { + workerDispatcher.seek(tp, first.offset()); + } catch (Exception seekEx) { + LOG.error("Failed to seek to offset {} for partition {} after OpenSearch batch error", first.offset(), tp, seekEx); + } + + try { + Thread.sleep(RETRY_SLEEP_MS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + } + } + } +} diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/resources/conf/logback.xml b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/resources/conf/logback.xml new file mode 100644 index 00000000000..29b33e86ad9 --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/resources/conf/logback.xml @@ -0,0 +1,55 @@ + + + + + + + + + ${LOG_DIR}/${LOG_FILE} + + + ${LOG_DIR}/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + + 100MB + 30 + 2GB + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/resources/conf/ranger-audit-dispatcher-opensearch-site.xml b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/resources/conf/ranger-audit-dispatcher-opensearch-site.xml new file mode 100644 index 00000000000..cab2086b5ca --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/resources/conf/ranger-audit-dispatcher-opensearch-site.xml @@ -0,0 +1,188 @@ + + + + log.dir + ${audit.dispatcher.opensearch.log.dir} + Log directory for OpenSearch dispatcher service + + + + + ranger.audit.dispatcher.host + ranger-audit-dispatcher-opensearch.rangernw + + - In Docker: Use full service name with domain (e.g., ranger-audit-server.rangernw) + - In VM: Use the actual FQDN (e.g., ranger.example.com) + + + + + ranger.audit.dispatcher.service.kerberos.principal + rangerauditserver/_HOST@EXAMPLE.COM + + rangerauditserver user kerberos principal for authentication into kafka + + + + + ranger.audit.dispatcher.service.kerberos.keytab + /etc/keytabs/rangerauditserver.keytab + + keytab of the rangerauditserver principal + + + + + + ranger.audit.dispatcher.kafka.bootstrap.servers + ranger-kafka.rangernw:9092 + + + + ranger.audit.dispatcher.kafka.topic.name + ranger_audits + + + + ranger.audit.dispatcher.kafka.security.protocol + SASL_PLAINTEXT + + + + ranger.audit.dispatcher.kafka.sasl.mechanism + GSSAPI + + + + + ranger.audit.dispatcher.thread.count + 5 + Number of OpenSearch dispatcher worker threads (higher for indexing throughput) + + + + + ranger.audit.dispatcher.offset.commit.strategy + batch + batch or manual + + + + ranger.audit.dispatcher.offset.commit.interval.ms + 30000 + Used only if strategy is 'manual' + + + + ranger.audit.dispatcher.max.poll.records + 500 + Maximum records per poll for batch processing + + + + + ranger.audit.dispatcher.class + org.apache.ranger.audit.dispatcher.kafka.AuditOpenSearchDispatcher + + + + + xasecure.audit.destination.elasticsearch + true + Enable the Elasticsearch-compatible audit destination used for OpenSearch writes + + + + ranger.audit.dispatcher.urls + ranger-opensearch + OpenSearch host(s), comma-separated + + + + ranger.audit.dispatcher.port + 9200 + + + + ranger.audit.dispatcher.protocol + http + + + + ranger.audit.dispatcher.index + ranger_audits + + + + ranger.audit.dispatcher.user + + + Username for OpenSearch Basic Auth, or Kerberos principal when password is a keytab path. + Use NONE or empty for unauthenticated OpenSearch (dev only). + Production: configure user/password or Kerberos keytab. + + + + + ranger.audit.dispatcher.password + + + Password for OpenSearch Basic Auth, or path to keytab for Kerberos. + Use NONE or empty for unauthenticated OpenSearch (dev only). + Production: configure user/password or Kerberos keytab. + + + + + + ranger.audit.dispatcher.type + opensearch + + + + ranger.audit.dispatcher.war.file + ranger-audit-dispatcher.war + + + + ranger.audit.dispatcher.launcher.class + org.apache.ranger.audit.dispatcher.AuditDispatcherLauncher + + + + ranger.audit.dispatcher.main.class + org.apache.ranger.audit.dispatcher.AuditDispatcherApplication + + + + ranger.audit.dispatcher.http.port + 7090 + + + + ranger.audit.dispatcher.contextName + / + + + + ranger.audit.dispatcher.kafka.group.id + ranger_audit_opensearch_dispatcher_group + + + + ranger.audit.dispatcher.opensearch.class + org.apache.ranger.audit.dispatcher.OpenSearchDispatcherManager + + \ No newline at end of file diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/test/java/org/apache/ranger/audit/dispatcher/TestAuditEventOpenSearchDocMapper.java b/audit-server/audit-dispatcher/dispatcher-opensearch/src/test/java/org/apache/ranger/audit/dispatcher/TestAuditEventOpenSearchDocMapper.java new file mode 100644 index 00000000000..ea6ffdb8086 --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/test/java/org/apache/ranger/audit/dispatcher/TestAuditEventOpenSearchDocMapper.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.dispatcher; + +import org.apache.ranger.audit.model.AuthzAuditEvent; +import org.junit.jupiter.api.Test; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class TestAuditEventOpenSearchDocMapper { + @Test + void toDoc_mapsAllFields() { + Date eventTime = new Date(1700000000000L); + + AuthzAuditEvent event = new AuthzAuditEvent( + 1, "dev_hdfs", "testuser", eventTime, "read", + "/tmp/test", "path", "read", (short) 1, "hdfs-agent", + 42L, "allowed by policy", "ranger-acl", "sess-123", + "hive", "192.168.1.1", "/tmp/test", "cl1", "zone1", 5L); + event.setEventId("evt-001"); + event.setSeqNum(10L); + event.setEventCount(1L); + event.setEventDurationMS(50L); + event.setAgentHostname("host1.example.com"); + + Set tags = new HashSet<>(); + tags.add("PII"); + event.setTags(tags); + + Map doc = AuditEventOpenSearchDocMapper.toDoc(event); + + assertNotNull(doc); + assertEquals("evt-001", doc.get("id")); + assertEquals("read", doc.get("access")); + assertEquals("ranger-acl", doc.get("enforcer")); + assertEquals("hdfs-agent", doc.get("agent")); + assertEquals("dev_hdfs", doc.get("repo")); + assertEquals("sess-123", doc.get("sess")); + assertEquals("testuser", doc.get("reqUser")); + assertEquals("/tmp/test", doc.get("reqData")); + assertEquals("/tmp/test", doc.get("resource")); + assertEquals("192.168.1.1", doc.get("cliIP")); + assertEquals("hive", doc.get("cliType")); + assertEquals((short) 1, doc.get("result")); + assertEquals(42L, doc.get("policy")); + assertEquals(1, doc.get("repoType")); + assertEquals("path", doc.get("resType")); + assertEquals("allowed by policy", doc.get("reason")); + assertEquals("read", doc.get("action")); + assertEquals(10L, doc.get("seq_num")); + assertEquals(1L, doc.get("event_count")); + assertEquals(50L, doc.get("event_dur_ms")); + assertEquals("cl1", doc.get("cluster")); + assertEquals("zone1", doc.get("zoneName")); + assertEquals("host1.example.com", doc.get("agentHost")); + assertEquals(5L, doc.get("policyVersion")); + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + sdf.setTimeZone(TimeZone.getTimeZone("UTC")); + assertEquals(sdf.format(eventTime), doc.get("evtTime")); + } + + @Test + void toDoc_handlesNullEventTime() { + AuthzAuditEvent event = new AuthzAuditEvent(); + event.setEventId("evt-002"); + event.setEventTime(null); + + Map doc = AuditEventOpenSearchDocMapper.toDoc(event); + + assertNotNull(doc); + assertEquals("evt-002", doc.get("id")); + assertNull(doc.get("evtTime")); + } + + @Test + void toDoc_handlesNullFields() { + AuthzAuditEvent event = new AuthzAuditEvent(); + + Map doc = AuditEventOpenSearchDocMapper.toDoc(event); + + assertNotNull(doc); + assertNull(doc.get("id")); + assertNull(doc.get("access")); + assertNull(doc.get("enforcer")); + assertNull(doc.get("agent")); + assertNull(doc.get("repo")); + assertNull(doc.get("reqUser")); + assertNull(doc.get("resource")); + assertNull(doc.get("cliIP")); + assertEquals(0, doc.get("repoType")); + assertEquals(0L, doc.get("policy")); + } + + @Test + void toDoc_mapsDatasets() { + AuthzAuditEvent event = new AuthzAuditEvent(); + event.setEventId("evt-003"); + + Set datasets = new HashSet<>(); + datasets.add("dataset1"); + datasets.add("dataset2"); + event.setDatasets(datasets); + + Set projects = new HashSet<>(); + projects.add("project1"); + event.setProjects(projects); + + Set datasetIds = new HashSet<>(); + datasetIds.add(100L); + datasetIds.add(200L); + event.setDatasetIds(datasetIds); + + Map doc = AuditEventOpenSearchDocMapper.toDoc(event); + + assertEquals(datasets, doc.get("datasets")); + assertEquals(projects, doc.get("projects")); + assertEquals(datasetIds, doc.get("datasetIds")); + } +} diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/test/java/org/apache/ranger/audit/dispatcher/TestOpenSearchDispatcherManager.java b/audit-server/audit-dispatcher/dispatcher-opensearch/src/test/java/org/apache/ranger/audit/dispatcher/TestOpenSearchDispatcherManager.java new file mode 100644 index 00000000000..612cc4c8cdd --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/test/java/org/apache/ranger/audit/dispatcher/TestOpenSearchDispatcherManager.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.dispatcher; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class TestOpenSearchDispatcherManager { + @AfterEach + void clearSystemProperty() { + System.clearProperty("ranger.audit.dispatcher.type"); + } + + @Test + void init_skipsWhenDispatcherTypeIsNotOpenSearch() { + System.setProperty("ranger.audit.dispatcher.type", "solr"); + + OpenSearchDispatcherManager manager = new OpenSearchDispatcherManager(); + Properties props = new Properties(); + + assertDoesNotThrow(() -> manager.init(props)); + } + + @Test + void init_throwsWhenPropsAreNull() { + OpenSearchDispatcherManager manager = new OpenSearchDispatcherManager(); + + assertThrows(RuntimeException.class, () -> manager.init(null)); + } + + @Test + void init_skipsWhenOpenSearchDestinationDisabled() { + OpenSearchDispatcherManager manager = new OpenSearchDispatcherManager(); + Properties props = new Properties(); + props.setProperty("xasecure.audit.destination.elasticsearch", "false"); + + assertDoesNotThrow(() -> manager.init(props)); + } + + @Test + void shutdown_handlesNullDispatcherGracefully() { + OpenSearchDispatcherManager manager = new OpenSearchDispatcherManager(); + + assertDoesNotThrow(manager::shutdown); + } + + @Test + void init_throwsWhenEnabledDispatcherClassCannotBeCreated() { + OpenSearchDispatcherManager manager = new OpenSearchDispatcherManager(); + Properties props = new Properties(); + props.setProperty("xasecure.audit.destination.elasticsearch", "true"); + props.setProperty("ranger.audit.dispatcher.class", "com.nonexistent.FakeDispatcher"); + props.setProperty("ranger.audit.dispatcher.kafka.bootstrap.servers", "localhost:9092"); + props.setProperty("ranger.audit.dispatcher.kafka.topic", "ranger_audits"); + + assertThrows(RuntimeException.class, () -> manager.init(props)); + } +} diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/test/java/org/apache/ranger/audit/dispatcher/kafka/TestAuditOpenSearchDispatcher.java b/audit-server/audit-dispatcher/dispatcher-opensearch/src/test/java/org/apache/ranger/audit/dispatcher/kafka/TestAuditOpenSearchDispatcher.java new file mode 100644 index 00000000000..941bda36ed4 --- /dev/null +++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/test/java/org/apache/ranger/audit/dispatcher/kafka/TestAuditOpenSearchDispatcher.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.dispatcher.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpEntity; +import org.apache.http.StatusLine; +import org.apache.ranger.audit.model.AuthzAuditEvent; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class TestAuditOpenSearchDispatcher { + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Mock + private RestClient mockRestClient; + + @Mock + private Response mockResponse; + + @Mock + private StatusLine mockStatusLine; + + private AuditOpenSearchDispatcher dispatcher; + + @BeforeEach + void setUp() throws Exception { + Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + sun.misc.Unsafe unsafe = (sun.misc.Unsafe) unsafeField.get(null); + dispatcher = (AuditOpenSearchDispatcher) unsafe.allocateInstance(AuditOpenSearchDispatcher.class); + + setField(dispatcher, "openSearchClient", mockRestClient); + setField(dispatcher, "openSearchIndex", "ranger_audits"); + } + + @Test + void processMessageBatch_sendsBulkRequestWithDocumentId() throws Exception { + AuthzAuditEvent event = new AuthzAuditEvent(); + event.setEventId("test-id-001"); + event.setAccessType("read"); + event.setUser("testuser"); + event.setRepositoryName("dev_hdfs"); + event.setEventTime(new Date(1700000000000L)); + + String auditJson = MAPPER.writeValueAsString(event); + setupSuccessResponse(); + + invokeProcessMessageBatch(Collections.singletonList(auditJson)); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Request.class); + verify(mockRestClient).performRequest(requestCaptor.capture()); + + Request request = requestCaptor.getValue(); + assertEquals("POST", request.getMethod()); + assertEquals("/_bulk", request.getEndpoint()); + + String[] lines = requestBody(request).split("\n"); + Map indexMeta = indexMetadata(lines[0]); + Map doc = MAPPER.readValue(lines[1], Map.class); + + assertEquals("ranger_audits", indexMeta.get("_index")); + assertEquals("test-id-001", indexMeta.get("_id")); + assertEquals("test-id-001", doc.get("id")); + assertEquals("read", doc.get("access")); + assertEquals("testuser", doc.get("reqUser")); + assertEquals("dev_hdfs", doc.get("repo")); + } + + @Test + void processMessageBatch_generatesDocumentIdWhenAuditIdMissing() throws Exception { + AuthzAuditEvent event = new AuthzAuditEvent(); + event.setAccessType("read"); + event.setUser("testuser"); + + String auditJson = MAPPER.writeValueAsString(event); + setupSuccessResponse(); + + invokeProcessMessageBatch(Collections.singletonList(auditJson)); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Request.class); + verify(mockRestClient).performRequest(requestCaptor.capture()); + + String[] lines = requestBody(requestCaptor.getValue()).split("\n"); + Map indexMeta = indexMetadata(lines[0]); + Map doc = MAPPER.readValue(lines[1], Map.class); + Object generatedId = indexMeta.get("_id"); + + assertNotNull(generatedId); + assertFalse(generatedId.toString().trim().isEmpty()); + assertEquals(generatedId, doc.get("id")); + } + + @Test + void processMessageBatch_throwsOnNullInput() { + assertThrows(Exception.class, () -> invokeProcessMessageBatch(null)); + } + + @Test + void processMessageBatch_throwsOnEmptyInput() { + assertThrows(Exception.class, () -> invokeProcessMessageBatch(Collections.emptyList())); + } + + @Test + void processMessageBatch_throwsOnHttpError() throws Exception { + AuthzAuditEvent event = new AuthzAuditEvent(); + event.setEventId("test-id-002"); + event.setUser("testuser"); + + String auditJson = MAPPER.writeValueAsString(event); + + when(mockRestClient.performRequest(any(Request.class))).thenReturn(mockResponse); + when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); + when(mockStatusLine.getStatusCode()).thenReturn(500); + + assertThrows(Exception.class, () -> invokeProcessMessageBatch(Collections.singletonList(auditJson))); + } + + @Test + void processMessageBatch_throwsOnBulkItemErrors() throws Exception { + AuthzAuditEvent event = new AuthzAuditEvent(); + event.setEventId("test-id-003"); + event.setUser("testuser"); + + String auditJson = MAPPER.writeValueAsString(event); + String errorResponse = "{\"errors\":true,\"items\":[{\"index\":{\"status\":400,\"error\":\"mapping error\"}}]}"; + HttpEntity entity = mock(HttpEntity.class); + when(entity.getContent()).thenReturn(new ByteArrayInputStream(errorResponse.getBytes(StandardCharsets.UTF_8))); + when(entity.getContentLength()).thenReturn((long) errorResponse.length()); + + when(mockRestClient.performRequest(any(Request.class))).thenReturn(mockResponse); + when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); + when(mockStatusLine.getStatusCode()).thenReturn(200); + when(mockResponse.getEntity()).thenReturn(entity); + + assertThrows(Exception.class, () -> invokeProcessMessageBatch(Collections.singletonList(auditJson))); + } + + private void setupSuccessResponse() throws Exception { + String successBody = "{\"errors\":false,\"items\":[]}"; + HttpEntity entity = mock(HttpEntity.class); + when(entity.getContent()).thenReturn(new ByteArrayInputStream(successBody.getBytes(StandardCharsets.UTF_8))); + when(entity.getContentLength()).thenReturn((long) successBody.length()); + + when(mockRestClient.performRequest(any(Request.class))).thenReturn(mockResponse); + when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); + when(mockStatusLine.getStatusCode()).thenReturn(200); + when(mockResponse.getEntity()).thenReturn(entity); + } + + private String requestBody(Request request) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + request.getEntity().writeTo(out); + + return out.toString(StandardCharsets.UTF_8.name()); + } + + @SuppressWarnings("unchecked") + private Map indexMetadata(String line) throws Exception { + Map meta = MAPPER.readValue(line, Map.class); + + return (Map) meta.get("index"); + } + + private void invokeProcessMessageBatch(Collection audits) throws Exception { + Method method = AuditOpenSearchDispatcher.class.getDeclaredMethod("processMessageBatch", Collection.class); + method.setAccessible(true); + try { + method.invoke(dispatcher, audits); + } catch (java.lang.reflect.InvocationTargetException e) { + if (e.getCause() instanceof Exception) { + throw (Exception) e.getCause(); + } + throw e; + } + } + + private void setField(Object target, String fieldName, Object value) throws Exception { + Class clazz = target.getClass(); + while (clazz != null) { + try { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + return; + } catch (NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } + } + throw new NoSuchFieldException(fieldName + " not found in class hierarchy"); + } +} diff --git a/audit-server/audit-dispatcher/pom.xml b/audit-server/audit-dispatcher/pom.xml index a306cc1674e..83a2f3d3b6c 100644 --- a/audit-server/audit-dispatcher/pom.xml +++ b/audit-server/audit-dispatcher/pom.xml @@ -34,6 +34,7 @@ dispatcher-app dispatcher-common dispatcher-hdfs + dispatcher-opensearch dispatcher-solr diff --git a/audit-server/audit-dispatcher/scripts/start-audit-dispatcher.sh b/audit-server/audit-dispatcher/scripts/start-audit-dispatcher.sh index ad72e71b73a..ae7100d40ed 100755 --- a/audit-server/audit-dispatcher/scripts/start-audit-dispatcher.sh +++ b/audit-server/audit-dispatcher/scripts/start-audit-dispatcher.sh @@ -36,6 +36,10 @@ if [ -z "${DISPATCHER_TYPE}" ]; then fi CONF_FILE="${AUDIT_DISPATCHER_CONF_DIR}/ranger-audit-dispatcher-${DISPATCHER_TYPE}-site.xml" +LOGBACK_CONFIG_FILE="${AUDIT_DISPATCHER_CONF_DIR}/logback.xml" +if [ -n "${DISPATCHER_TYPE}" ] && [ -f "${AUDIT_DISPATCHER_CONF_DIR}/logback-${DISPATCHER_TYPE}.xml" ]; then + LOGBACK_CONFIG_FILE="${AUDIT_DISPATCHER_CONF_DIR}/logback-${DISPATCHER_TYPE}.xml" +fi # Set default heap size if not set if [ -z "${AUDIT_DISPATCHER_HEAP}" ]; then @@ -44,7 +48,7 @@ fi # Set default Java options if not set if [ -z "${AUDIT_DISPATCHER_OPTS}" ]; then - AUDIT_DISPATCHER_OPTS="-Dlogback.configurationFile=${AUDIT_DISPATCHER_CONF_DIR}/logback.xml \ + AUDIT_DISPATCHER_OPTS="-Dlogback.configurationFile=${LOGBACK_CONFIG_FILE} \ -Daudit.dispatcher.log.dir=${AUDIT_DISPATCHER_LOG_DIR} \ -Daudit.dispatcher.log.file=ranger-audit-dispatcher.log \ -Djava.net.preferIPv4Stack=true \ diff --git a/dev-support/ranger-docker/Dockerfile.ranger-kdc b/dev-support/ranger-docker/Dockerfile.ranger-kdc index f3ce89d182f..a81119bfffd 100644 --- a/dev-support/ranger-docker/Dockerfile.ranger-kdc +++ b/dev-support/ranger-docker/Dockerfile.ranger-kdc @@ -36,10 +36,13 @@ COPY ./scripts/kdc/kdc.conf /etc/krb5kdc/kdc.conf COPY ./scripts/kdc/kadm5.acl /etc/krb5kdc/kadm5.acl COPY ./scripts/kdc/entrypoint.sh /entrypoint.sh -RUN chmod +x /entrypoint.sh +RUN chmod +x /entrypoint.sh && apt-get update && apt-get install -y netcat-openbsd && rm -rf /var/lib/apt/lists/* VOLUME /etc/keytabs EXPOSE 88/tcp 88/udp 749/tcp +HEALTHCHECK --interval=5s --timeout=3s --start-period=120s --retries=30 \ + CMD test -f /etc/keytabs/.provisioned && echo | nc -w 1 localhost 88 || exit 1 + ENTRYPOINT ["/entrypoint.sh"] diff --git a/dev-support/ranger-docker/README.md b/dev-support/ranger-docker/README.md index 3f8090d6eb6..27c96dd741b 100644 --- a/dev-support/ranger-docker/README.md +++ b/dev-support/ranger-docker/README.md @@ -106,6 +106,72 @@ docker compose -f docker-compose.ranger.yml -f docker-compose.ranger-trino.yml u ~~~ docker compose -f docker-compose.ranger.yml -f docker-compose.ranger-opensearch.yml up -d ~~~ + +#### OpenSearch audit flow (replace Solr for access audits) + +OpenSearch can replace Solr for **audit storage and UI queries**. Ranger Admin reads audits via +`audit_store=opensearch` using a native low-level REST client (compatible with any OpenSearch version). + +**Write path:** access audits flow through audit-server ingestor, Kafka, and the Java +`ranger-audit-dispatcher-opensearch` service into the OpenSearch `ranger_audits` index. +Ranger Admin policy/admin transaction audits remain DB-backed; this is the same boundary +used by the Solr audit path. + +##### Quick start + +~~~ +# Prerequisites: build the audit-dispatcher tarball and download archives +mvn clean package -DskipTests -pl distro -am +cp target/ranger-*-audit-dispatcher.tar.gz dev-support/ranger-docker/dist/ +cd dev-support/ranger-docker +./download-archives.sh kafka opensearch hadoop + +# Run the E2E test (starts stack, tests, tears down automatically) +./scripts/audit/e2e-audit-opensearch.sh + +# Or keep the stack running after the test for debugging +./scripts/audit/e2e-audit-opensearch.sh --no-teardown + +# Re-run just the test against an already-running stack +./scripts/audit/e2e-audit-opensearch.sh --skip-start +~~~ + +##### Manual setup (advanced) + +For finer control, the individual steps can be run manually: + +~~~ +export RANGER_DB_TYPE=postgres + +# 1. Start OpenSearch first (Ranger Admin's bootstrapper needs it on startup) +docker compose -f docker-compose.ranger.yml -f docker-compose.ranger-opensearch.yml \ + -f docker-compose.ranger-kafka.yml -f docker-compose.ranger-hadoop.yml \ + -f docker-compose.ranger-audit-server.yml \ + -f docker-compose.ranger-audit-dispatcher-opensearch.yml up -d ranger-opensearch + +# 2. Start core stack (Ranger Admin, Kafka, Hadoop) +# Kafka auto-creates the ranger_audits topic on startup. +# Ranger Admin auto-creates the OpenSearch index via OpenSearchIndexBootStrapper. +docker compose -f docker-compose.ranger.yml -f docker-compose.ranger-opensearch.yml \ + -f docker-compose.ranger-kafka.yml -f docker-compose.ranger-hadoop.yml \ + -f docker-compose.ranger-audit-server.yml \ + -f docker-compose.ranger-audit-dispatcher-opensearch.yml up -d ranger ranger-kafka ranger-hadoop + +# 3. Start audit ingestor and OpenSearch dispatcher +docker compose -f docker-compose.ranger.yml -f docker-compose.ranger-opensearch.yml \ + -f docker-compose.ranger-kafka.yml -f docker-compose.ranger-hadoop.yml \ + -f docker-compose.ranger-audit-server.yml \ + -f docker-compose.ranger-audit-dispatcher-opensearch.yml up -d ranger-audit-ingestor ranger-audit-dispatcher-opensearch + +# 4. Run end-to-end audit test (--skip-start since stack is already up) +./scripts/audit/e2e-audit-opensearch.sh --skip-start +~~~ + +For **fresh Ranger installs** using OpenSearch for audits, set `audit_store=opensearch` in +`scripts/admin/ranger-admin-install-postgres.properties` and configure the `audit_opensearch_*` properties. + +For **existing Solr-based installs**, switching stores requires updating `audit_store=opensearch` +in install.properties, configuring the `audit_opensearch_*` properties, and restarting Ranger Admin. Similarly, check the `depends` section of the `docker-compose.ranger-service.yaml` file and add docker-compose files for these services when trying to bring up the `service` container. #### Bring up all containers diff --git a/dev-support/ranger-docker/docker-compose.ranger-audit-dispatcher-opensearch.yml b/dev-support/ranger-docker/docker-compose.ranger-audit-dispatcher-opensearch.yml new file mode 100644 index 00000000000..b99a03c6d93 --- /dev/null +++ b/dev-support/ranger-docker/docker-compose.ranger-audit-dispatcher-opensearch.yml @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file to You +# under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +services: + ranger-audit-dispatcher-opensearch: + build: + context: . + dockerfile: Dockerfile.ranger-audit-dispatcher + args: + - RANGER_BASE_IMAGE=${RANGER_BASE_IMAGE} + - RANGER_BASE_VERSION=${RANGER_BASE_VERSION} + - RANGER_VERSION=${RANGER_VERSION} + - KERBEROS_ENABLED=${KERBEROS_ENABLED} + image: ranger-audit-dispatcher-opensearch:latest + container_name: ranger-audit-dispatcher-opensearch + hostname: ranger-audit-dispatcher-opensearch.rangernw + command: ["opensearch"] + stdin_open: true + tty: true + depends_on: + ranger-kdc: + condition: service_healthy + ranger-kafka: + condition: service_started + ranger-opensearch: + condition: service_started + ports: + - "7093:7090" + environment: + JAVA_HOME: /opt/java/openjdk + AUDIT_DISPATCHER_HOME_DIR: /opt/ranger/audit-dispatcher + AUDIT_DISPATCHER_CONF_DIR: /opt/ranger/audit-dispatcher/conf + AUDIT_DISPATCHER_LOG_DIR: /var/log/ranger/audit-dispatcher/opensearch + AUDIT_DISPATCHER_HEAP: "-Xms512m -Xmx2g" + RANGER_VERSION: ${RANGER_VERSION} + KERBEROS_ENABLED: ${KERBEROS_ENABLED} + KAFKA_BOOTSTRAP_SERVERS: ranger-kafka.rangernw:9092 + OPENSEARCH_URL: http://ranger-opensearch.rangernw:9200 + networks: + - ranger + volumes: + - ./dist/keytabs/ranger-audit-dispatcher-opensearch:/etc/keytabs + - ranger-audit-dispatcher-opensearch-logs:/var/log/ranger/audit-dispatcher/opensearch + - ./scripts/audit-dispatcher/ranger-audit-dispatcher-opensearch-site.xml:/opt/ranger/audit-dispatcher/conf/ranger-audit-dispatcher-opensearch-site.xml + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:7090/api/health/ping"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + +volumes: + ranger-audit-dispatcher-opensearch-logs: + +networks: + ranger: + name: rangernw diff --git a/dev-support/ranger-docker/docker-compose.ranger.yml b/dev-support/ranger-docker/docker-compose.ranger.yml index 3e263fecc1b..a75de142470 100644 --- a/dev-support/ranger-docker/docker-compose.ranger.yml +++ b/dev-support/ranger-docker/docker-compose.ranger.yml @@ -81,6 +81,9 @@ services: image: ranger-zk container_name: ranger-zk hostname: ranger-zk.rangernw + depends_on: + ranger-kdc: + condition: service_healthy volumes: - ./dist/keytabs/ranger-zk:/etc/keytabs - ./scripts/kdc/krb5.conf:/etc/krb5.conf:ro diff --git a/dev-support/ranger-docker/download-archives.sh b/dev-support/ranger-docker/download-archives.sh index 1b35be61bac..b107cc50d29 100755 --- a/dev-support/ranger-docker/download-archives.sh +++ b/dev-support/ranger-docker/download-archives.sh @@ -22,9 +22,15 @@ # # -# source .env file to get versions to download +# Load .env file to get versions to download. Do not source it directly: +# values like JAVA_OPTS contain spaces and are valid for docker compose .env +# files, but not valid shell assignments unless quoted. # -source .env +while IFS='=' read -r key value; do + if [[ "${key}" =~ ^[A-Za-z_][A-Za-z0-9_]*$ ]]; then + export "${key}=${value}" + fi +done < .env downloadIfNotPresent() { diff --git a/dev-support/ranger-docker/scripts/admin/ranger-admin-install-postgres.properties b/dev-support/ranger-docker/scripts/admin/ranger-admin-install-postgres.properties index e8d28825f49..fc8e8682aaf 100644 --- a/dev-support/ranger-docker/scripts/admin/ranger-admin-install-postgres.properties +++ b/dev-support/ranger-docker/scripts/admin/ranger-admin-install-postgres.properties @@ -43,19 +43,33 @@ rangerUsersync_password=rangerR0cks! keyadmin_password=rangerR0cks! -audit_store=solr +## --- Audit store: enable ONE of the following blocks --- + +## Option 1: Solr (default) +# audit_store=solr # FQDN required for SPNEGO to Solr (HTTP/ranger-solr.rangernw@REALM) -audit_solr_urls=http://ranger-solr.rangernw:8983/solr/ranger_audits -audit_solr_collection_name=ranger_audits +# audit_solr_urls=http://ranger-solr.rangernw:8983/solr/ranger_audits +# audit_solr_collection_name=ranger_audits +## Option 2: OpenSearch / Elasticsearch (legacy - uses ES HighLevel REST Client) # audit_store=elasticsearch -audit_elasticsearch_urls= -audit_elasticsearch_port=9200 -audit_elasticsearch_protocol=http -audit_elasticsearch_user=elastic -audit_elasticsearch_password=elasticsearch -audit_elasticsearch_index=ranger_audits -audit_elasticsearch_bootstrap_enabled=true +# audit_elasticsearch_urls=ranger-opensearch +# audit_elasticsearch_port=9200 +# audit_elasticsearch_protocol=http +# audit_elasticsearch_user=NONE +# audit_elasticsearch_password=NONE +# audit_elasticsearch_index=ranger_audits +# audit_elasticsearch_bootstrap_enabled=true + +## Option 3: OpenSearch (native - uses low-level REST client, works with any OpenSearch version) +audit_store=opensearch +audit_opensearch_urls=ranger-opensearch +audit_opensearch_port=9200 +audit_opensearch_protocol=http +audit_opensearch_user=admin +audit_opensearch_password=admin +audit_opensearch_index=ranger_audits +audit_opensearch_bootstrap_enabled=true policymgr_external_url=http://ranger-admin:6080 policymgr_http_enabled=true diff --git a/dev-support/ranger-docker/scripts/audit-dispatcher/ranger-audit-dispatcher-opensearch-site.xml b/dev-support/ranger-docker/scripts/audit-dispatcher/ranger-audit-dispatcher-opensearch-site.xml new file mode 100644 index 00000000000..741c9f55907 --- /dev/null +++ b/dev-support/ranger-docker/scripts/audit-dispatcher/ranger-audit-dispatcher-opensearch-site.xml @@ -0,0 +1,168 @@ + + + + ranger.audit.dispatcher.opensearch.class + org.apache.ranger.audit.dispatcher.OpenSearchDispatcherManager + + + + log.dir + ${audit.dispatcher.opensearch.log.dir} + Log directory for OpenSearch dispatcher service + + + + ranger.audit.dispatcher.host + ranger-audit-dispatcher-opensearch.rangernw + + + + ranger.audit.dispatcher.service.kerberos.principal + rangerauditserver/_HOST@EXAMPLE.COM + + + + ranger.audit.dispatcher.service.kerberos.keytab + /etc/keytabs/rangerauditserver.keytab + + + + + ranger.audit.dispatcher.kafka.bootstrap.servers + ranger-kafka.rangernw:9092 + + + + ranger.audit.dispatcher.kafka.topic.name + ranger_audits + + + + ranger.audit.dispatcher.kafka.security.protocol + SASL_PLAINTEXT + + + + ranger.audit.dispatcher.kafka.sasl.mechanism + GSSAPI + + + + ranger.audit.dispatcher.thread.count + 1 + + + + ranger.audit.dispatcher.offset.commit.strategy + batch + + + + ranger.audit.dispatcher.offset.commit.interval.ms + 30000 + + + + ranger.audit.dispatcher.max.poll.records + 500 + + + + ranger.audit.dispatcher.class + org.apache.ranger.audit.dispatcher.kafka.AuditOpenSearchDispatcher + + + + + xasecure.audit.destination.elasticsearch + true + + + + ranger.audit.dispatcher.urls + ranger-opensearch.rangernw + + + + ranger.audit.dispatcher.port + 9200 + + + + ranger.audit.dispatcher.protocol + http + + + + ranger.audit.dispatcher.index + ranger_audits + + + + ranger.audit.dispatcher.user + + + Username for OpenSearch Basic Auth, or Kerberos principal when password is a keytab path. + Use NONE or empty for unauthenticated OpenSearch (dev only). + Production: configure user/password or Kerberos keytab. + + + + + ranger.audit.dispatcher.password + + + Password for OpenSearch Basic Auth, or path to keytab for Kerberos. + Use NONE or empty for unauthenticated OpenSearch (dev only). + Production: configure user/password or Kerberos keytab. + + + + + + ranger.audit.dispatcher.type + opensearch + + + + ranger.audit.dispatcher.war.file + ranger-audit-dispatcher.war + + + + ranger.audit.dispatcher.launcher.class + org.apache.ranger.audit.dispatcher.AuditDispatcherLauncher + + + + ranger.audit.dispatcher.main.class + org.apache.ranger.audit.dispatcher.AuditDispatcherApplication + + + + ranger.audit.dispatcher.http.port + 7090 + + + + ranger.audit.dispatcher.contextName + / + + + + ranger.audit.dispatcher.kafka.group.id + ranger_audit_opensearch_dispatcher_group + + diff --git a/dev-support/ranger-docker/scripts/kdc/entrypoint.sh b/dev-support/ranger-docker/scripts/kdc/entrypoint.sh index 4e35f545724..d74a6135c5f 100644 --- a/dev-support/ranger-docker/scripts/kdc/entrypoint.sh +++ b/dev-support/ranger-docker/scripts/kdc/entrypoint.sh @@ -84,6 +84,9 @@ function create_keytabs() { create_principal_and_keytab HTTP ranger-audit-dispatcher-hdfs create_principal_and_keytab rangerauditserver ranger-audit-dispatcher-hdfs + create_principal_and_keytab HTTP ranger-audit-dispatcher-opensearch + create_principal_and_keytab rangerauditserver ranger-audit-dispatcher-opensearch + create_principal_and_keytab rangertagsync ranger-tagsync create_principal_and_keytab rangerusersync ranger-usersync @@ -145,7 +148,10 @@ if [ ! -f $DB_DIR/principal ]; then echo "Database initialized" create_keytabs - create_testusers ranger ranger-usersync ranger-tagsync ranger-pdp ranger-audit-ingestor ranger-audit-dispatcher-solr ranger-audit-dispatcher-hdfs ranger-hadoop ranger-hive ranger-hbase ranger-kafka ranger-solr ranger-knox ranger-kms ranger-ozone ranger-trino ranger-opensearch + create_testusers ranger ranger-usersync ranger-tagsync ranger-pdp ranger-audit-ingestor ranger-audit-dispatcher-solr ranger-audit-dispatcher-hdfs ranger-audit-dispatcher-opensearch ranger-hadoop ranger-hive ranger-hbase ranger-kafka ranger-solr ranger-knox ranger-kms ranger-ozone ranger-trino ranger-opensearch + + touch /etc/keytabs/.provisioned + echo "All keytabs provisioned" else echo "KDC DB already exists; skipping create" fi diff --git a/dev-support/ranger-docker/scripts/opensearch/opensearch.yml b/dev-support/ranger-docker/scripts/opensearch/opensearch.yml index d63405043b7..42b0e8898fa 100644 --- a/dev-support/ranger-docker/scripts/opensearch/opensearch.yml +++ b/dev-support/ranger-docker/scripts/opensearch/opensearch.yml @@ -48,3 +48,6 @@ http.cors.allow-headers: "X-Requested-With, Content-Type, Content-Length, Author # See opensearch-jaas.conf for Kerberos principal and keytab settings # JVM is configured with: -Djava.security.auth.login.config and -Djava.security.krb5.conf +# Wire-compatible with ES 7 REST client used by Ranger Admin +compatibility.override_main_response_version: true + diff --git a/distro/pom.xml b/distro/pom.xml index 3a1b38004a1..fa97c3fbd12 100644 --- a/distro/pom.xml +++ b/distro/pom.xml @@ -35,6 +35,12 @@ ${project.version} provided + + org.apache.ranger + audit-dispatcher-opensearch + ${project.version} + provided + org.apache.ranger audit-dispatcher-solr @@ -113,6 +119,12 @@ ${project.version} provided + + org.apache.ranger + ranger-audit-dest-os + ${project.version} + provided + org.apache.ranger ranger-audit-dest-solr diff --git a/distro/src/main/assembly/audit-dispatcher.xml b/distro/src/main/assembly/audit-dispatcher.xml index 34af9d79c53..c8fe1bbbdee 100644 --- a/distro/src/main/assembly/audit-dispatcher.xml +++ b/distro/src/main/assembly/audit-dispatcher.xml @@ -56,6 +56,15 @@ 644 + + conf + ${project.parent.basedir}/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/resources/conf + + ranger-audit-dispatcher-opensearch-site.xml + + 644 + + @@ -98,6 +107,22 @@ + + + lib/dispatchers/opensearch + ${project.parent.basedir}/audit-server/audit-dispatcher/dispatcher-opensearch/target/lib + + *.jar + + + + lib/dispatchers/opensearch + ${project.parent.basedir}/audit-server/audit-dispatcher/dispatcher-opensearch/target + + audit-dispatcher-opensearch-${project.version}.jar + + + lib/dispatchers/solr @@ -116,6 +141,13 @@ + + conf + ${project.parent.basedir}/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/resources/conf/logback.xml + logback-opensearch.xml + 644 + + webapp diff --git a/embeddedwebserver/src/main/java/org/apache/ranger/server/tomcat/EmbeddedServer.java b/embeddedwebserver/src/main/java/org/apache/ranger/server/tomcat/EmbeddedServer.java index 93394cf6ffe..296da342a07 100644 --- a/embeddedwebserver/src/main/java/org/apache/ranger/server/tomcat/EmbeddedServer.java +++ b/embeddedwebserver/src/main/java/org/apache/ranger/server/tomcat/EmbeddedServer.java @@ -72,8 +72,10 @@ public class EmbeddedServer { private static final String AUDIT_SOURCE_TYPE = "ranger.audit.source.type"; private static final String AUDIT_SOURCE_SOLR = "solr"; private static final String AUDIT_SOURCE_ES = "elasticsearch"; + private static final String AUDIT_SOURCE_OPENSEARCH = "opensearch"; private static final String SOLR_BOOTSTRAP_ENABLED = "ranger.audit.solr.bootstrap.enabled"; private static final String ES_BOOTSTRAP_ENABLED = "ranger.audit.elasticsearch.bootstrap.enabled"; + private static final String OS_BOOTSTRAP_ENABLED = "ranger.audit.opensearch.bootstrap.enabled"; private static final String ADMIN_USER_KEYTAB = "ranger.admin.kerberos.keytab"; private static final String ADMIN_NAME_RULES = "hadoop.security.auth_to_local"; private static final String ADMIN_SERVER_NAME = "rangeradmin"; @@ -451,6 +453,18 @@ private void startServer(final Tomcat server) { LOG.severe("Error while setting elasticsearch " + e); } } + } else if (AUDIT_SOURCE_OPENSEARCH.equalsIgnoreCase(auditSourceType)) { + boolean osBootstrapEnabled = Boolean.parseBoolean(EmbeddedServerUtil.getConfig(OS_BOOTSTRAP_ENABLED, "true")); + + if (osBootstrapEnabled) { + try { + OpenSearchIndexBootStrapper osSchemaSetup = new OpenSearchIndexBootStrapper(); + + osSchemaSetup.start(); + } catch (Exception e) { + LOG.severe("Error while setting opensearch " + e); + } + } } } diff --git a/embeddedwebserver/src/main/java/org/apache/ranger/server/tomcat/OpenSearchIndexBootStrapper.java b/embeddedwebserver/src/main/java/org/apache/ranger/server/tomcat/OpenSearchIndexBootStrapper.java new file mode 100644 index 00000000000..9e8cd100104 --- /dev/null +++ b/embeddedwebserver/src/main/java/org/apache/ranger/server/tomcat/OpenSearchIndexBootStrapper.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.server.tomcat; + +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.logging.Logger; + +public class OpenSearchIndexBootStrapper extends Thread { + private static final Logger LOG = Logger.getLogger(OpenSearchIndexBootStrapper.class.getName()); + + private static final String CONFIG_PREFIX = "ranger.audit.opensearch"; + private static final String CONFIG_URLS = CONFIG_PREFIX + ".urls"; + private static final String CONFIG_PORT = CONFIG_PREFIX + ".port"; + private static final String CONFIG_PROTOCOL = CONFIG_PREFIX + ".protocol"; + private static final String CONFIG_USER = CONFIG_PREFIX + ".user"; + private static final String CONFIG_PASSWORD = CONFIG_PREFIX + ".password"; + private static final String CONFIG_INDEX = CONFIG_PREFIX + ".index"; + private static final String CONFIG_INTERVAL = CONFIG_PREFIX + ".time.interval"; + private static final String CONFIG_SHARDS = CONFIG_PREFIX + ".no.shards"; + private static final String CONFIG_REPLICAS = CONFIG_PREFIX + ".no.replica"; + private static final String CONFIG_MAX_RETRY = CONFIG_PREFIX + ".max.retry"; + private static final String SCHEMA_FILE_NAME = "ranger_opensearch_schema.json"; + + private volatile RestClient client; + private String index; + private String urls; + private String protocol; + private String user; + private String password; + private int port; + private int noOfShards; + private int noOfReplicas; + private int maxRetry; + private long timeInterval; + + @Override + public void run() { + LOG.info("OpenSearchIndexBootStrapper: starting..."); + + readConfig(); + + if (StringUtils.isBlank(urls) || "NONE".equalsIgnoreCase(urls)) { + LOG.severe("OpenSearch URL is not configured. Aborting bootstrap."); + + return; + } + + int retryCounter = 0; + + while (maxRetry == -1 || retryCounter < maxRetry) { + retryCounter++; + + try { + LOG.info("Trying to acquire OpenSearch connection"); + + connect(); + + if (client == null) { + logErrorAndWait(retryCounter, "Failed to create OpenSearch client"); + + continue; + } + + LOG.info("Connection to OpenSearch established successfully"); + + if (indexExists()) { + LOG.info("Index '" + index + "' already exists. Bootstrap complete."); + + return; + } + + createIndex(); + + LOG.info("Index '" + index + "' created successfully."); + + return; + } catch (Exception e) { + logErrorAndWait(retryCounter, e.getMessage()); + } + } + + LOG.severe("OpenSearch index bootstrap failed after " + retryCounter + " attempts."); + } + + private void readConfig() { + urls = EmbeddedServerUtil.getConfig(CONFIG_URLS, ""); + protocol = EmbeddedServerUtil.getConfig(CONFIG_PROTOCOL, "http"); + user = EmbeddedServerUtil.getConfig(CONFIG_USER, ""); + password = EmbeddedServerUtil.getConfig(CONFIG_PASSWORD, ""); + port = Integer.parseInt(EmbeddedServerUtil.getConfig(CONFIG_PORT, "9200")); + index = EmbeddedServerUtil.getConfig(CONFIG_INDEX, "ranger_audits"); + noOfShards = Integer.parseInt(EmbeddedServerUtil.getConfig(CONFIG_SHARDS, "1")); + noOfReplicas = Integer.parseInt(EmbeddedServerUtil.getConfig(CONFIG_REPLICAS, "1")); + maxRetry = Integer.parseInt(EmbeddedServerUtil.getConfig(CONFIG_MAX_RETRY, "30")); + timeInterval = Long.parseLong(EmbeddedServerUtil.getConfig(CONFIG_INTERVAL, "60000")); + } + + private void connect() { + if (client != null) { + return; + } + + HttpHost[] hosts = Arrays.stream(urls.split(",")).map(String::trim).filter(h -> !h.isEmpty()).map(h -> new HttpHost(h, port, protocol)).toArray(HttpHost[]::new); + + RestClientBuilder builder = RestClient.builder(hosts); + + if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password) && !"NONE".equalsIgnoreCase(user) && !"NONE".equalsIgnoreCase(password)) { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); + builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } + + client = builder.build(); + } + + private boolean indexExists() throws IOException { + Request request = new Request("HEAD", "/" + index); + request.addParameter("ignore", "404"); + + Response response = client.performRequest(request); + + return response.getStatusLine().getStatusCode() == 200; + } + + private void createIndex() throws IOException { + String schemaJson = loadSchemaFile(); + String body = buildCreateIndexBody(schemaJson); + + Request request = new Request("PUT", "/" + index); + + request.setEntity(new NStringEntity(body, ContentType.APPLICATION_JSON)); + + Response response = client.performRequest(request); + String result = EntityUtils.toString(response.getEntity()); + + if (response.getStatusLine().getStatusCode() >= 400) { + throw new IOException("Failed to create index '" + index + "': " + result); + } + + LOG.info("Create index response: " + result); + } + + private String buildCreateIndexBody(String mappingsJson) { + return "{\"settings\":{\"number_of_shards\":" + noOfShards + + ",\"number_of_replicas\":" + noOfReplicas + + "},\"mappings\":" + mappingsJson + "}"; + } + + private String loadSchemaFile() throws IOException { + String jarLocation; + + try { + jarLocation = this.getClass().getProtectionDomain().getCodeSource().getLocation().toURI().getPath(); + } catch (Exception e) { + throw new IOException("Cannot determine Ranger home directory", e); + } + + String rangerHomeDir = new File(jarLocation).getParentFile().getParentFile().getParentFile().getPath(); + String schemaPath = Paths.get(rangerHomeDir, "contrib", "opensearch_for_audit_setup", "conf", SCHEMA_FILE_NAME).toString(); + + File schemaFile = new File(schemaPath); + + if (!schemaFile.exists()) { + throw new IOException("OpenSearch schema file not found: " + schemaPath); + } + + return new String(Files.readAllBytes(Paths.get(schemaPath))); + } + + private void logErrorAndWait(int retryCounter, String message) { + int attemptsLeft = maxRetry == -1 ? -1 : maxRetry - retryCounter; + + LOG.severe("Error during OpenSearch bootstrap. [retrying after " + timeInterval + + " ms]. Attempts left: " + attemptsLeft + ". Error: " + message); + + try { + Thread.sleep(timeInterval); + } catch (InterruptedException e) { + LOG.warning("OpenSearch bootstrap thread interrupted"); + + Thread.currentThread().interrupt(); + } + } +} diff --git a/security-admin/contrib/opensearch_for_audit_setup/conf/ranger_opensearch_schema.json b/security-admin/contrib/opensearch_for_audit_setup/conf/ranger_opensearch_schema.json new file mode 100644 index 00000000000..f8f3bbf2d70 --- /dev/null +++ b/security-admin/contrib/opensearch_for_audit_setup/conf/ranger_opensearch_schema.json @@ -0,0 +1,136 @@ +{ + "properties": { + "_expire_at_": { + "type": "date", + "store": true, + "doc_values": true + }, + "_ttl_": { + "type": "text", + "store": true + }, + "_version_": { + "type": "long", + "store": true, + "index": false + }, + "access": { + "type": "keyword" + }, + "action": { + "type": "keyword" + }, + "agent": { + "type": "keyword" + }, + "agentHost": { + "type": "keyword" + }, + "cliIP": { + "type": "keyword" + }, + "cliType": { + "type": "keyword" + }, + "cluster": { + "type": "keyword" + }, + "reqContext": { + "type": "keyword" + }, + "enforcer": { + "type": "keyword" + }, + "event_count": { + "type": "long", + "doc_values": true + }, + "event_dur_ms": { + "type": "long", + "doc_values": true + }, + "evtTime": { + "type": "date", + "doc_values": true + }, + "id": { + "type": "keyword", + "store": true + }, + "logType": { + "type": "keyword" + }, + "policy": { + "type": "long", + "doc_values": true + }, + "proxyUsers": { + "type": "keyword" + }, + "reason": { + "type": "text" + }, + "repo": { + "type": "keyword" + }, + "repoType": { + "type": "integer", + "doc_values": true + }, + "req_caller_id": { + "type": "keyword" + }, + "req_self_id": { + "type": "keyword" + }, + "reqData": { + "type": "text" + }, + "reqUser": { + "type": "keyword" + }, + "resType": { + "type": "keyword" + }, + "resource": { + "type": "keyword" + }, + "result": { + "type": "integer" + }, + "seq_num": { + "type": "long", + "doc_values": true + }, + "sess": { + "type": "keyword" + }, + "tags": { + "type": "keyword" + }, + "tags_str": { + "type": "text" + }, + "datasets": { + "type": "keyword" + }, + "projects": { + "type": "keyword" + }, + "datasetIds": { + "type": "long" + }, + "text": { + "type": "text" + }, + "zoneName": { + "type": "keyword" + }, + "policyVersion": { + "type": "long" + }, + "additionalInfo": { + "type": "text" + } + } +} diff --git a/security-admin/pom.xml b/security-admin/pom.xml index 491baf80fcb..4df6c6a8228 100644 --- a/security-admin/pom.xml +++ b/security-admin/pom.xml @@ -498,6 +498,11 @@ ranger-audit-dest-es ${project.version} + + org.apache.ranger + ranger-audit-dest-os + ${project.version} + org.apache.ranger ranger-authn diff --git a/security-admin/scripts/install.properties b/security-admin/scripts/install.properties index 0f5a093f6a0..1904e435cf8 100644 --- a/security-admin/scripts/install.properties +++ b/security-admin/scripts/install.properties @@ -101,6 +101,15 @@ audit_elasticsearch_password= audit_elasticsearch_index= audit_elasticsearch_bootstrap_enabled=true +# * OpenSearch audit store properties (when audit_store=opensearch) +audit_opensearch_urls= +audit_opensearch_port=9200 +audit_opensearch_protocol=http +audit_opensearch_user= +audit_opensearch_password= +audit_opensearch_index=ranger_audits +audit_opensearch_bootstrap_enabled=true + # * audit_solr_url URL to Solr. E.g. http://:6083/solr/ranger_audits audit_solr_urls= diff --git a/security-admin/scripts/setup.sh b/security-admin/scripts/setup.sh index 43ef4b704a4..3b5b58b29d0 100755 --- a/security-admin/scripts/setup.sh +++ b/security-admin/scripts/setup.sh @@ -102,6 +102,13 @@ audit_elasticsearch_user=$(get_prop 'audit_elasticsearch_user' $PROPFILE) audit_elasticsearch_password=$(get_prop 'audit_elasticsearch_password' $PROPFILE) audit_elasticsearch_index=$(get_prop 'audit_elasticsearch_index' $PROPFILE) audit_elasticsearch_bootstrap_enabled=$(get_prop 'audit_elasticsearch_bootstrap_enabled' $PROPFILE) +audit_opensearch_urls=$(get_prop 'audit_opensearch_urls' $PROPFILE) +audit_opensearch_protocol=$(get_prop 'audit_opensearch_protocol' $PROPFILE) +audit_opensearch_port=$(get_prop 'audit_opensearch_port' $PROPFILE) +audit_opensearch_user=$(get_prop 'audit_opensearch_user' $PROPFILE) +audit_opensearch_password=$(get_prop 'audit_opensearch_password' $PROPFILE) +audit_opensearch_index=$(get_prop 'audit_opensearch_index' $PROPFILE) +audit_opensearch_bootstrap_enabled=$(get_prop 'audit_opensearch_bootstrap_enabled' $PROPFILE) audit_solr_urls=$(get_prop 'audit_solr_urls' $PROPFILE) audit_solr_user=$(get_prop_or_default 'audit_solr_user' $PROPFILE '') audit_solr_password=$(get_prop_or_default 'audit_solr_password' $PROPFILE '') @@ -305,6 +312,16 @@ init_variables(){ exit 1 fi fi + if [ "${audit_store}" == "opensearch" ] ;then + if [ "${audit_opensearch_urls}" == "" ] ;then + log "[I] Please provide valid URL for 'opensearch' audit store!" + exit 1 + fi + if [ "${audit_opensearch_port}" == "" ] ;then + log "[I] Please provide valid port for 'opensearch' audit store!" + exit 1 + fi + fi if [ "${audit_store}" == "cloudwatch" ] ;then if [ "${audit_cloudwatch_region}" == "" ] ;then @@ -857,6 +874,38 @@ update_properties() { fi + if [ "${audit_store}" == "opensearch" ] + then + propertyName=ranger.audit.opensearch.urls + newPropertyValue=${audit_opensearch_urls} + updatePropertyToFilePy $propertyName "${newPropertyValue}" $to_file_ranger + + propertyName=ranger.audit.opensearch.protocol + newPropertyValue=${audit_opensearch_protocol} + updatePropertyToFilePy $propertyName "${newPropertyValue}" $to_file_ranger + + propertyName=ranger.audit.opensearch.port + newPropertyValue=${audit_opensearch_port} + updatePropertyToFilePy $propertyName "${newPropertyValue}" $to_file_ranger + + propertyName=ranger.audit.opensearch.user + newPropertyValue=${audit_opensearch_user} + updatePropertyToFilePy $propertyName "${newPropertyValue}" $to_file_ranger + + propertyName=ranger.audit.opensearch.password + newPropertyValue=${audit_opensearch_password} + updatePropertyToFilePy $propertyName "${newPropertyValue}" $to_file_ranger + + propertyName=ranger.audit.opensearch.index + newPropertyValue=${audit_opensearch_index} + updatePropertyToFilePy $propertyName "${newPropertyValue}" $to_file_ranger + + propertyName=ranger.audit.opensearch.bootstrap.enabled + newPropertyValue=${audit_opensearch_bootstrap_enabled} + updatePropertyToFilePy $propertyName "${newPropertyValue}" $to_file_ranger + + fi + if [ "${audit_store}" == "cloudwatch" ] then propertyName=ranger.audit.amazon_cloudwatch.region diff --git a/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java b/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java index f995249c1c1..f1fbeed3d2a 100644 --- a/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java +++ b/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java @@ -145,6 +145,9 @@ public class AssetMgr extends AssetMgrBase { @Autowired ElasticSearchAccessAuditsService elasticSearchAccessAuditsService; + @Autowired + org.apache.ranger.opensearch.OpenSearchAccessAuditsService openSearchAccessAuditsService; + @Autowired CloudWatchAccessAuditsService cloudWatchAccessAuditsService; @@ -761,6 +764,8 @@ public VXAccessAuditList getAccessLogs(SearchCriteria searchCriteria) { return solrAccessAuditsService.searchXAccessAudits(searchCriteria); } else if (RangerBizUtil.AUDIT_STORE_ELASTIC_SEARCH.equalsIgnoreCase(xaBizUtil.getAuditDBType())) { return elasticSearchAccessAuditsService.searchXAccessAudits(searchCriteria); + } else if (RangerBizUtil.AUDIT_STORE_OPENSEARCH.equalsIgnoreCase(xaBizUtil.getAuditDBType())) { + return openSearchAccessAuditsService.searchXAccessAudits(searchCriteria); } else if (RangerBizUtil.AUDIT_STORE_CLOUD_WATCH.equalsIgnoreCase(xaBizUtil.getAuditDBType())) { return cloudWatchAccessAuditsService.searchXAccessAudits(searchCriteria); } else { diff --git a/security-admin/src/main/java/org/apache/ranger/biz/RangerBizUtil.java b/security-admin/src/main/java/org/apache/ranger/biz/RangerBizUtil.java index b54e96bb174..0fa8910d63e 100644 --- a/security-admin/src/main/java/org/apache/ranger/biz/RangerBizUtil.java +++ b/security-admin/src/main/java/org/apache/ranger/biz/RangerBizUtil.java @@ -86,6 +86,7 @@ public class RangerBizUtil { public static final String AUDIT_STORE_RDBMS = "DB"; public static final String AUDIT_STORE_SOLR = "solr"; public static final String AUDIT_STORE_ELASTIC_SEARCH = "elasticSearch"; + public static final String AUDIT_STORE_OPENSEARCH = "opensearch"; public static final String AUDIT_STORE_CLOUD_WATCH = "cloudwatch"; public static final boolean BATCH_CLEAR_ENABLED = PropertiesUtil.getBooleanProperty("ranger.jpa.jdbc.batch-clear.enable", true); public static final int POLICY_BATCH_SIZE = PropertiesUtil.getIntProperty("ranger.jpa.jdbc.batch-clear.size", 10); diff --git a/security-admin/src/main/java/org/apache/ranger/biz/XAuditMgr.java b/security-admin/src/main/java/org/apache/ranger/biz/XAuditMgr.java index 67b10246c6f..0934f67eb48 100644 --- a/security-admin/src/main/java/org/apache/ranger/biz/XAuditMgr.java +++ b/security-admin/src/main/java/org/apache/ranger/biz/XAuditMgr.java @@ -44,6 +44,9 @@ public class XAuditMgr extends XAuditMgrBase { @Autowired ElasticSearchAccessAuditsService elasticSearchAccessAuditsService; + @Autowired + org.apache.ranger.opensearch.OpenSearchAccessAuditsService openSearchAccessAuditsService; + @Autowired CloudWatchAccessAuditsService cloudWatchAccessAuditsService; @@ -118,6 +121,8 @@ public VXAccessAuditList searchXAccessAudits(SearchCriteria searchCriteria) { return solrAccessAuditsService.searchXAccessAudits(searchCriteria); } else if (RangerBizUtil.AUDIT_STORE_ELASTIC_SEARCH.equalsIgnoreCase(auditDBType)) { return elasticSearchAccessAuditsService.searchXAccessAudits(searchCriteria); + } else if (RangerBizUtil.AUDIT_STORE_OPENSEARCH.equalsIgnoreCase(auditDBType)) { + return openSearchAccessAuditsService.searchXAccessAudits(searchCriteria); } else if (RangerBizUtil.AUDIT_STORE_CLOUD_WATCH.equalsIgnoreCase(auditDBType)) { return cloudWatchAccessAuditsService.searchXAccessAudits(searchCriteria); } else { @@ -133,6 +138,8 @@ public VXLong getXAccessAuditSearchCount(SearchCriteria searchCriteria) { return solrAccessAuditsService.getXAccessAuditSearchCount(searchCriteria); } else if (RangerBizUtil.AUDIT_STORE_ELASTIC_SEARCH.equalsIgnoreCase(auditDBType)) { return elasticSearchAccessAuditsService.getXAccessAuditSearchCount(searchCriteria); + } else if (RangerBizUtil.AUDIT_STORE_OPENSEARCH.equalsIgnoreCase(auditDBType)) { + return openSearchAccessAuditsService.getXAccessAuditSearchCount(searchCriteria); } else if (RangerBizUtil.AUDIT_STORE_CLOUD_WATCH.equalsIgnoreCase(auditDBType)) { return cloudWatchAccessAuditsService.getXAccessAuditSearchCount(searchCriteria); } else { diff --git a/security-admin/src/main/java/org/apache/ranger/elasticsearch/ElasticSearchMgr.java b/security-admin/src/main/java/org/apache/ranger/elasticsearch/ElasticSearchMgr.java index 4b264daf639..2a2db5a48f0 100644 --- a/security-admin/src/main/java/org/apache/ranger/elasticsearch/ElasticSearchMgr.java +++ b/security-admin/src/main/java/org/apache/ranger/elasticsearch/ElasticSearchMgr.java @@ -168,6 +168,7 @@ synchronized RestHighLevelClient connect() { RestClientBuilder restClientBuilder = getRestClientBuilder(urls, protocol, user, password, port); client = new RestHighLevelClient(restClientBuilder); + me = client; } catch (Throwable t) { logger.error("Can't connect to ElasticSearch: {}", parameterString, t); } diff --git a/security-admin/src/main/java/org/apache/ranger/opensearch/OpenSearchAccessAuditsService.java b/security-admin/src/main/java/org/apache/ranger/opensearch/OpenSearchAccessAuditsService.java new file mode 100644 index 00000000000..48098705d33 --- /dev/null +++ b/security-admin/src/main/java/org/apache/ranger/opensearch/OpenSearchAccessAuditsService.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.opensearch; + +import org.apache.ranger.audit.provider.MiscUtil; +import org.apache.ranger.common.MessageEnums; +import org.apache.ranger.common.PropertiesUtil; +import org.apache.ranger.common.RESTErrorUtil; +import org.apache.ranger.common.SearchCriteria; +import org.apache.ranger.db.XXServiceDefDao; +import org.apache.ranger.entity.XXService; +import org.apache.ranger.entity.XXServiceDef; +import org.apache.ranger.opensearch.OpenSearchUtil.OpenSearchSearchResult; +import org.apache.ranger.plugin.util.JsonUtilsV2; +import org.apache.ranger.view.VXAccessAudit; +import org.apache.ranger.view.VXAccessAuditList; +import org.apache.ranger.view.VXLong; +import org.elasticsearch.client.RestClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@Service +@Scope("singleton") +public class OpenSearchAccessAuditsService extends org.apache.ranger.AccessAuditsService { + private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAccessAuditsService.class); + + @Autowired + OpenSearchMgr openSearchMgr; + + @Autowired + OpenSearchUtil openSearchUtil; + + public VXAccessAuditList searchXAccessAudits(SearchCriteria searchCriteria) { + RestClient client = openSearchMgr.getClient(); + final boolean hiveQueryVisibility = PropertiesUtil.getBooleanProperty("ranger.audit.hive.query.visibility", true); + + if (client == null) { + LOG.warn("OpenSearch client is null, so not running the query."); + + throw restErrorUtil.createRESTException("Error connecting to OpenSearch", MessageEnums.ERROR_SYSTEM); + } + + Map paramList = searchCriteria.getParamList(); + + updateUserExclusion(paramList); + + OpenSearchSearchResult result; + + try { + result = openSearchUtil.searchResources(searchCriteria, searchFields, sortFields, client, openSearchMgr.index); + } catch (IOException e) { + LOG.warn("OpenSearch query failed: {}", e.getMessage()); + + throw restErrorUtil.createRESTException("Error querying OpenSearch", MessageEnums.ERROR_SYSTEM); + } + + List xAccessAuditList = new ArrayList<>(); + + for (Map source : result.getSources()) { + VXAccessAudit vXAccessAudit = populateViewBean(source); + String serviceType = vXAccessAudit.getServiceType(); + boolean isHive = "hive".equalsIgnoreCase(serviceType); + + if (!hiveQueryVisibility && isHive) { + vXAccessAudit.setRequestData(null); + } else if (isHive) { + String accessType = vXAccessAudit.getAccessType(); + + if ("grant".equalsIgnoreCase(accessType) || "revoke".equalsIgnoreCase(accessType)) { + String requestData = vXAccessAudit.getRequestData(); + + if (requestData != null) { + try { + vXAccessAudit.setRequestData(java.net.URLDecoder.decode(requestData, "UTF-8")); + } catch (UnsupportedEncodingException e) { + LOG.warn("Error while encoding request data: {}", requestData, e); + } + } + } + } + + xAccessAuditList.add(vXAccessAudit); + } + + VXAccessAuditList returnList = new VXAccessAuditList(); + + returnList.setPageSize(searchCriteria.getMaxRows()); + returnList.setResultSize(result.getSources().size()); + returnList.setTotalCount(result.getTotalHits()); + returnList.setStartIndex(searchCriteria.getStartIndex()); + returnList.setVXAccessAudits(xAccessAuditList); + + return returnList; + } + + public void setRestErrorUtil(RESTErrorUtil restErrorUtil) { + this.restErrorUtil = restErrorUtil; + } + + public VXLong getXAccessAuditSearchCount(SearchCriteria searchCriteria) { + long count = 100; + VXLong vXLong = new VXLong(); + + vXLong.setValue(count); + + return vXLong; + } + + private VXAccessAudit populateViewBean(Map source) { + VXAccessAudit accessAudit = new VXAccessAudit(); + Object value; + + value = source.get("id"); + if (value != null) { + accessAudit.setId((long) value.hashCode()); + } + + value = source.get("cluster"); + if (value != null) { + accessAudit.setClusterName(value.toString()); + } + + value = source.get("zoneName"); + if (value != null) { + accessAudit.setZoneName(value.toString()); + } + + value = source.get("agentHost"); + if (value != null) { + accessAudit.setAgentHost(value.toString()); + } + + value = source.get("policyVersion"); + if (value != null) { + accessAudit.setPolicyVersion(MiscUtil.toLong(value)); + } + + value = source.get("access"); + if (value != null) { + accessAudit.setAccessType(value.toString()); + } + + value = source.get("enforcer"); + if (value != null) { + accessAudit.setAclEnforcer(value.toString()); + } + + value = source.get("agent"); + if (value != null) { + accessAudit.setAgentId(value.toString()); + } + + value = source.get("repo"); + if (value != null) { + accessAudit.setRepoName(value.toString()); + + XXService xxService = daoManager.getXXService().findByName(accessAudit.getRepoName()); + + if (xxService != null) { + accessAudit.setRepoDisplayName(xxService.getDisplayName()); + } + } + + value = source.get("sess"); + if (value != null) { + accessAudit.setSessionId(value.toString()); + } + + value = source.get("reqUser"); + if (value != null) { + accessAudit.setRequestUser(value.toString()); + } + + value = source.get("reqData"); + if (value != null) { + accessAudit.setRequestData(value.toString()); + } + + value = source.get("resource"); + if (value != null) { + accessAudit.setResourcePath(value.toString()); + } + + value = source.get("cliIP"); + if (value != null) { + accessAudit.setClientIP(value.toString()); + } + + value = source.get("result"); + if (value != null) { + accessAudit.setAccessResult(MiscUtil.toInt(value)); + } + + value = source.get("policy"); + if (value != null) { + accessAudit.setPolicyId(MiscUtil.toLong(value)); + } + + value = source.get("repoType"); + if (value != null) { + accessAudit.setRepoType(MiscUtil.toInt(value)); + + if (null != daoManager) { + XXServiceDefDao xxServiceDef = daoManager.getXXServiceDef(); + + if (xxServiceDef != null) { + XXServiceDef xServiceDef = xxServiceDef.getById((long) accessAudit.getRepoType()); + + if (xServiceDef != null) { + accessAudit.setServiceType(xServiceDef.getName()); + accessAudit.setServiceTypeDisplayName(xServiceDef.getDisplayName()); + } + } + } + } + + value = source.get("resType"); + if (value != null) { + accessAudit.setResourceType(value.toString()); + } + + value = source.get("reason"); + if (value != null) { + accessAudit.setResultReason(value.toString()); + } + + value = source.get("action"); + if (value != null) { + accessAudit.setAction(value.toString()); + } + + value = source.get("evtTime"); + if (value != null) { + accessAudit.setEventTime(MiscUtil.toLocalDate(value)); + } + + value = source.get("seq_num"); + if (value != null) { + accessAudit.setSequenceNumber(MiscUtil.toLong(value)); + } + + value = source.get("event_count"); + if (value != null) { + accessAudit.setEventCount(MiscUtil.toLong(value)); + } + + value = source.get("event_dur_ms"); + if (value != null) { + accessAudit.setEventDuration(MiscUtil.toLong(value)); + } + + value = source.get("tags"); + if (value != null) { + accessAudit.setTags(value.toString()); + } + + value = source.get("datasets"); + if (value != null) { + try { + accessAudit.setDatasets(JsonUtilsV2.nonSerializableObjToJson(value)); + } catch (Exception e) { + LOG.warn("Failed to convert datasets to json", e); + } + } + + value = source.get("projects"); + if (value != null) { + try { + accessAudit.setProjects(JsonUtilsV2.nonSerializableObjToJson(value)); + } catch (Exception e) { + LOG.warn("Failed to convert projects to json", e); + } + } + + value = source.get("datasetIds"); + if (value != null) { + try { + accessAudit.setDatasetIds(JsonUtilsV2.nonSerializableObjToJson(value)); + } catch (Exception e) { + LOG.warn("Failed to convert datasetIds to json", e); + } + } + + return accessAudit; + } +} diff --git a/security-admin/src/main/java/org/apache/ranger/opensearch/OpenSearchMgr.java b/security-admin/src/main/java/org/apache/ranger/opensearch/OpenSearchMgr.java new file mode 100644 index 00000000000..06e66c74112 --- /dev/null +++ b/security-admin/src/main/java/org/apache/ranger/opensearch/OpenSearchMgr.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.opensearch; + +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthSchemeProvider; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.config.AuthSchemes; +import org.apache.http.config.Lookup; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.impl.auth.SPNegoSchemeFactory; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.ranger.audit.destination.OpenSearchAuditDestination; +import org.apache.ranger.authorization.credutils.CredentialsProviderUtil; +import org.apache.ranger.authorization.credutils.kerberos.KerberosCredentialsProvider; +import org.apache.ranger.common.PropertiesUtil; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.security.auth.Subject; +import javax.security.auth.kerberos.KerberosTicket; + +import java.io.File; +import java.security.PrivilegedActionException; +import java.util.Arrays; +import java.util.Date; +import java.util.Locale; + +@Component +public class OpenSearchMgr { + private static final Logger LOG = LoggerFactory.getLogger(OpenSearchMgr.class); + + private static final String CONFIG_PREFIX = OpenSearchAuditDestination.CONFIG_PREFIX; + private static final String CONFIG_URLS = OpenSearchAuditDestination.CONFIG_URLS; + private static final String CONFIG_PORT = OpenSearchAuditDestination.CONFIG_PORT; + private static final String CONFIG_PROTOCOL = OpenSearchAuditDestination.CONFIG_PROTOCOL; + private static final String CONFIG_USER = OpenSearchAuditDestination.CONFIG_USER; + private static final String CONFIG_PASSWORD = OpenSearchAuditDestination.CONFIG_PASSWORD; + private static final String CONFIG_INDEX = OpenSearchAuditDestination.CONFIG_INDEX; + + public String index; + + private volatile RestClient client; + private Subject subject; + private String user; + private String password; + + public RestClient getClient() { + RestClient me = client; + + if (me != null && subject != null) { + KerberosTicket ticket = CredentialsProviderUtil.getTGT(subject); + + try { + if (new Date().getTime() > ticket.getEndTime().getTime()) { + client = null; + CredentialsProviderUtil.ticketExpireTime80 = 0; + + me = connect(); + } else if (CredentialsProviderUtil.ticketWillExpire(ticket)) { + subject = CredentialsProviderUtil.login(user, password); + } + } catch (PrivilegedActionException e) { + LOG.error("PrivilegedActionException:", e); + + throw new RuntimeException(e); + } + + return me; + } else { + me = connect(); + } + + return me; + } + + synchronized RestClient connect() { + RestClient me = client; + + if (me == null) { + synchronized (OpenSearchMgr.class) { + me = client; + + if (me == null) { + String urls = PropertiesUtil.getProperty(CONFIG_PREFIX + "." + CONFIG_URLS); + String protocol = PropertiesUtil.getProperty(CONFIG_PREFIX + "." + CONFIG_PROTOCOL, "http"); + + user = PropertiesUtil.getProperty(CONFIG_PREFIX + "." + CONFIG_USER, ""); + password = PropertiesUtil.getProperty(CONFIG_PREFIX + "." + CONFIG_PASSWORD, ""); + + int port = Integer.parseInt(PropertiesUtil.getProperty(CONFIG_PREFIX + "." + CONFIG_PORT, "9200")); + + this.index = PropertiesUtil.getProperty(CONFIG_PREFIX + "." + CONFIG_INDEX, "ranger_audits"); + + String parameterString = String.format(Locale.ROOT, "User:%s, %s://%s:%s/%s", user, protocol, urls, port, index); + + LOG.info("Initializing OpenSearch connection: {}", parameterString); + + if (urls != null) { + urls = urls.trim(); + } + + if (StringUtils.isBlank(urls) || "NONE".equalsIgnoreCase(urls)) { + LOG.warn("OpenSearch URLs not configured or set to NONE"); + + return null; + } + + try { + if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password) && password.contains("keytab") && new File(password).exists()) { + subject = CredentialsProviderUtil.login(user, password); + } + + RestClientBuilder builder = buildRestClientBuilder(urls, protocol, user, password, port); + + client = builder.build(); + me = client; + } catch (Throwable t) { + LOG.error("Cannot connect to OpenSearch: {}", parameterString, t); + } + } + } + } + + return me; + } + + public static RestClientBuilder buildRestClientBuilder(String urls, String protocol, String user, String password, int port) { + HttpHost[] hosts = Arrays.stream(urls.split(",")).map(String::trim).filter(h -> !h.isEmpty()).map(h -> new HttpHost(h, port, protocol)).toArray(HttpHost[]::new); + + RestClientBuilder builder = RestClient.builder(hosts); + + if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password) && !"NONE".equalsIgnoreCase(user) && !"NONE".equalsIgnoreCase(password)) { + if (password.contains("keytab") && new File(password).exists()) { + KerberosCredentialsProvider credentialsProvider = CredentialsProviderUtil.getKerberosCredentials(user, password); + Lookup authRegistry = RegistryBuilder.create().register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory()).build(); + + builder.setHttpClientConfigCallback(httpClientBuilder -> { + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + httpClientBuilder.setDefaultAuthSchemeRegistry(authRegistry); + return httpClientBuilder; + }); + } else { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); + builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } + } + + return builder; + } +} diff --git a/security-admin/src/main/java/org/apache/ranger/opensearch/OpenSearchUtil.java b/security-admin/src/main/java/org/apache/ranger/opensearch/OpenSearchUtil.java new file mode 100644 index 00000000000..7682c525d5a --- /dev/null +++ b/security-admin/src/main/java/org/apache/ranger/opensearch/OpenSearchUtil.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.opensearch; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.entity.ContentType; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.http.util.EntityUtils; +import org.apache.ranger.common.PropertiesUtil; +import org.apache.ranger.common.SearchCriteria; +import org.apache.ranger.common.SearchField; +import org.apache.ranger.common.SortField; +import org.apache.ranger.common.StringUtil; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.stream.Collectors; + +@Component +public class OpenSearchUtil { + private static final Logger LOG = LoggerFactory.getLogger(OpenSearchUtil.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final String LUCENE_SPECIAL_CHARS = "+-=&|> searchFields, List sortFields, RestClient client, String index) throws IOException { + String body = buildSearchBody(searchCriteria, searchFields, sortFields); + + LOG.debug("OpenSearch query on index [{}]: {}", index, body); + + Request request = new Request("POST", "/" + index + "/_search"); + + request.setEntity(new NStringEntity(body, ContentType.APPLICATION_JSON)); + + Response response = client.performRequest(request); + String json = EntityUtils.toString(response.getEntity()); + JsonNode root = MAPPER.readTree(json); + + long totalHits = root.at("/hits/total/value").asLong(0); + JsonNode hitsArray = root.at("/hits/hits"); + + List> sources = new ArrayList<>(); + + if (hitsArray.isArray()) { + for (JsonNode hit : hitsArray) { + JsonNode sourceNode = hit.get("_source"); + + if (sourceNode != null) { + @SuppressWarnings("unchecked") + Map source = MAPPER.convertValue(sourceNode, Map.class); + + sources.add(source); + } + } + } + + return new OpenSearchSearchResult(totalHits, sources); + } + + public List> fetchByIds(RestClient client, String index, List ids) throws IOException { + if (ids == null || ids.isEmpty()) { + return Collections.emptyList(); + } + + Map mgetBody = new HashMap<>(); + + mgetBody.put("ids", ids); + + String body = MAPPER.writeValueAsString(mgetBody); + + Request request = new Request("POST", "/" + index + "/_mget"); + + request.setEntity(new NStringEntity(body, ContentType.APPLICATION_JSON)); + + Response response = client.performRequest(request); + String json = EntityUtils.toString(response.getEntity()); + JsonNode root = MAPPER.readTree(json); + JsonNode docsNode = root.get("docs"); + + List> results = new ArrayList<>(); + + if (docsNode != null && docsNode.isArray()) { + for (JsonNode doc : docsNode) { + if (doc.has("found") && doc.get("found").asBoolean()) { + JsonNode sourceNode = doc.get("_source"); + + if (sourceNode != null) { + @SuppressWarnings("unchecked") + Map source = MAPPER.convertValue(sourceNode, Map.class); + + results.add(source); + } + } + } + } + + return results; + } + + String buildSearchBody(SearchCriteria searchCriteria, List searchFields, List sortFields) { + List> mustClauses = new ArrayList<>(); + Date fromDate = null; + Date toDate = null; + String dateFieldName = null; + + if (searchCriteria.getParamList() != null) { + for (SearchField field : searchFields) { + String clientFieldName = field.getClientFieldName(); + String fieldName = field.getFieldName(); + SearchField.DATA_TYPE dataType = field.getDataType(); + SearchField.SEARCH_TYPE searchType = field.getSearchType(); + Object paramValue = searchCriteria.getParamValue(clientFieldName); + + if (paramValue == null || paramValue.toString().isEmpty()) { + continue; + } + + if (dataType == SearchField.DATA_TYPE.DATE) { + if (paramValue instanceof Date) { + if (searchType == SearchField.SEARCH_TYPE.GREATER_EQUAL_THAN || searchType == SearchField.SEARCH_TYPE.GREATER_THAN) { + fromDate = (Date) paramValue; + dateFieldName = fieldName; + } else if (searchType == SearchField.SEARCH_TYPE.LESS_EQUAL_THAN || searchType == SearchField.SEARCH_TYPE.LESS_THAN) { + toDate = (Date) paramValue; + dateFieldName = fieldName; + } + } + + continue; + } + + Map clause = buildClause(fieldName, dataType, searchType, paramValue); + + if (clause != null) { + mustClauses.add(clause); + } + } + + if (fromDate != null || toDate != null) { + mustClauses.add(buildDateRange(dateFieldName, fromDate, toDate)); + } + } + + Map query = new LinkedHashMap<>(); + Map bool = new HashMap<>(); + + bool.put("must", mustClauses.isEmpty() ? List.of(Map.of("match_all", Map.of())) : mustClauses); + query.put("query", Map.of("bool", bool)); + query.put("from", searchCriteria.getStartIndex()); + query.put("size", searchCriteria.getMaxRows()); + + String[] sortResolved = resolveSortField(searchCriteria, sortFields); + + if (sortResolved != null) { + query.put("sort", List.of(Map.of(sortResolved[0], Map.of("order", sortResolved[1])))); + } + + try { + return MAPPER.writeValueAsString(query); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize OpenSearch query", e); + } + } + + private Map buildClause(String fieldName, SearchField.DATA_TYPE dataType, SearchField.SEARCH_TYPE searchType, Object paramValue) { + if (fieldName.startsWith("-")) { + Map inner = buildClause(fieldName.substring(1), dataType, searchType, paramValue); + + if (inner == null) { + return null; + } + + return Map.of("bool", Map.of("must_not", List.of(inner))); + } + + if (paramValue instanceof Collection) { + Collection valueList = (Collection) paramValue; + + if (valueList.isEmpty()) { + return null; + } + + String queryString = valueList.stream().map(v -> "(" + escapeLucene(v.toString().trim().toLowerCase()) + ")").collect(Collectors.joining(" OR ")); + + return Map.of("query_string", Map.of("query", queryString, "default_field", fieldName)); + } + + if (searchType == SearchField.SEARCH_TYPE.PARTIAL) { + String value = paramValue.toString().trim(); + + if (value.isEmpty()) { + return null; + } + + return Map.of("query_string", Map.of("query", "*" + escapeLucene(value.toLowerCase()) + "*", "default_field", fieldName)); + } else { + String value = paramValue.toString().trim(); + + if (value.isEmpty()) { + return null; + } + + return Map.of("match_phrase", Map.of(fieldName, escapeLucene(value.toLowerCase()))); + } + } + + private Map buildDateRange(String fieldName, Date fromDate, Date toDate) { + Map rangeParams = new LinkedHashMap<>(); + + rangeParams.put("format", dateFormatStr); + + if (fromDate != null) { + rangeParams.put("gte", dateFormat.format(fromDate)); + } + + if (toDate != null) { + rangeParams.put("lte", dateFormat.format(toDate)); + } + + return Map.of("range", Map.of(fieldName, rangeParams)); + } + + private String[] resolveSortField(SearchCriteria searchCriteria, List sortFields) { + String sortBy = searchCriteria.getSortBy(); + String querySortBy = null; + + if (sortBy != null && !sortBy.trim().isEmpty()) { + sortBy = sortBy.trim(); + + for (SortField sortField : sortFields) { + if (sortBy.equalsIgnoreCase(sortField.getParamName())) { + querySortBy = sortField.getFieldName(); + searchCriteria.setSortBy(sortField.getParamName()); + break; + } + } + } + + if (querySortBy == null) { + for (SortField sortField : sortFields) { + if (sortField.isDefault()) { + querySortBy = sortField.getFieldName(); + searchCriteria.setSortBy(sortField.getParamName()); + searchCriteria.setSortType(sortField.getDefaultOrder().name()); + break; + } + } + } + + if (querySortBy != null) { + String order = "desc".equalsIgnoreCase(searchCriteria.getSortType()) ? "desc" : "asc"; + + return new String[] {querySortBy, order}; + } + + return null; + } + + static String escapeLucene(String value) { + StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < value.length(); i++) { + char c = value.charAt(i); + + if (LUCENE_SPECIAL_CHARS.indexOf(c) >= 0) { + sb.append('\\'); + } + + sb.append(c); + } + + return sb.toString(); + } + + public static class OpenSearchSearchResult { + private final long totalHits; + private final List> sources; + + public OpenSearchSearchResult(long totalHits, List> sources) { + this.totalHits = totalHits; + this.sources = sources; + } + + public long getTotalHits() { + return totalHits; + } + + public List> getSources() { + return sources; + } + } +} diff --git a/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml b/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml index 76ca23bd664..a69d5886dd5 100644 --- a/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml +++ b/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml @@ -516,6 +516,34 @@ ranger.audit.elasticsearch.bootstrap.enabled true + + ranger.audit.opensearch.urls + + + + ranger.audit.opensearch.protocol + http + + + ranger.audit.opensearch.port + 9200 + + + ranger.audit.opensearch.user + + + + ranger.audit.opensearch.password + + + + ranger.audit.opensearch.index + ranger_audits + + + ranger.audit.opensearch.bootstrap.enabled + true + ranger.audit.solr.max.retry 30 diff --git a/security-admin/src/main/resources/conf.dist/ranger-admin-site.xml b/security-admin/src/main/resources/conf.dist/ranger-admin-site.xml index a75cd05a345..a8c0a51d7c2 100644 --- a/security-admin/src/main/resources/conf.dist/ranger-admin-site.xml +++ b/security-admin/src/main/resources/conf.dist/ranger-admin-site.xml @@ -79,6 +79,34 @@ ranger.audit.elasticsearch.bootstrap.enabled true + + ranger.audit.opensearch.urls + + + + ranger.audit.opensearch.protocol + http + + + ranger.audit.opensearch.port + 9200 + + + ranger.audit.opensearch.user + + + + ranger.audit.opensearch.password + + + + ranger.audit.opensearch.index + ranger_audits + + + ranger.audit.opensearch.bootstrap.enabled + true + ranger.audit.amazon_cloudwatch.region us-east-2 diff --git a/security-admin/src/test/java/org/apache/ranger/opensearch/OpenSearchAccessAuditsServiceTest.java b/security-admin/src/test/java/org/apache/ranger/opensearch/OpenSearchAccessAuditsServiceTest.java new file mode 100644 index 00000000000..6229dffee89 --- /dev/null +++ b/security-admin/src/test/java/org/apache/ranger/opensearch/OpenSearchAccessAuditsServiceTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.opensearch; + +import org.apache.ranger.common.MessageEnums; +import org.apache.ranger.common.RESTErrorUtil; +import org.apache.ranger.common.SearchCriteria; +import org.apache.ranger.db.RangerDaoManager; +import org.apache.ranger.db.XXServiceDao; +import org.apache.ranger.db.XXServiceDefDao; +import org.apache.ranger.opensearch.OpenSearchUtil.OpenSearchSearchResult; +import org.apache.ranger.view.VXAccessAuditList; +import org.elasticsearch.client.RestClient; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import javax.ws.rs.WebApplicationException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class OpenSearchAccessAuditsServiceTest { + @InjectMocks + OpenSearchAccessAuditsService service; + + @Mock + OpenSearchMgr openSearchMgr; + + @Mock + OpenSearchUtil openSearchUtil; + + @Mock + RESTErrorUtil restErrorUtil; + + @Mock + RangerDaoManager daoManager; + + @Mock + XXServiceDao xxServiceDao; + + @Mock + XXServiceDefDao xxServiceDefDao; + + @Mock + RestClient restClient; + + @BeforeEach + void setUp() throws Exception { + service.setRestErrorUtil(restErrorUtil); + + java.lang.reflect.Field daoField = org.apache.ranger.AccessAuditsService.class.getDeclaredField("daoManager"); + daoField.setAccessible(true); + daoField.set(service, daoManager); + } + + @Test + void searchXAccessAudits_clientNull_throwsException() { + when(openSearchMgr.getClient()).thenReturn(null); + when(restErrorUtil.createRESTException(anyString(), any(MessageEnums.class))) + .thenReturn(new WebApplicationException(500)); + + SearchCriteria criteria = new SearchCriteria(); + + assertThrows(WebApplicationException.class, () -> service.searchXAccessAudits(criteria)); + } + + @Test + void searchXAccessAudits_ioException_throwsException() throws Exception { + when(openSearchMgr.getClient()).thenReturn(restClient); + openSearchMgr.index = "ranger_audits"; + when(openSearchUtil.searchResources(any(), any(), any(), eq(restClient), eq("ranger_audits"))) + .thenThrow(new IOException("Connection refused")); + when(restErrorUtil.createRESTException(anyString(), any(MessageEnums.class))) + .thenReturn(new WebApplicationException(500)); + + SearchCriteria criteria = new SearchCriteria(); + + assertThrows(WebApplicationException.class, () -> service.searchXAccessAudits(criteria)); + } + + @Test + void searchXAccessAudits_success() throws Exception { + when(openSearchMgr.getClient()).thenReturn(restClient); + openSearchMgr.index = "ranger_audits"; + + Map doc1 = new HashMap<>(); + doc1.put("id", "test-id-001"); + doc1.put("reqUser", "testuser1"); + doc1.put("resource", "/tmp/test"); + doc1.put("access", "read"); + doc1.put("result", 1); + doc1.put("repo", "dev_hdfs"); + doc1.put("repoType", 1); + doc1.put("action", "read"); + + List> sources = new ArrayList<>(); + sources.add(doc1); + + OpenSearchSearchResult mockResult = new OpenSearchSearchResult(1, sources); + + when(openSearchUtil.searchResources(any(), any(), any(), eq(restClient), anyString())) + .thenReturn(mockResult); + when(daoManager.getXXService()).thenReturn(xxServiceDao); + when(daoManager.getXXServiceDef()).thenReturn(xxServiceDefDao); + + SearchCriteria criteria = new SearchCriteria(); + criteria.setMaxRows(25); + criteria.setStartIndex(0); + + VXAccessAuditList result = service.searchXAccessAudits(criteria); + + assertNotNull(result); + assertEquals(1, result.getTotalCount()); + assertEquals(1, result.getResultSize()); + assertNotNull(result.getVXAccessAudits()); + assertEquals(1, result.getVXAccessAudits().size()); + assertEquals("testuser1", result.getVXAccessAudits().get(0).getRequestUser()); + assertEquals("/tmp/test", result.getVXAccessAudits().get(0).getResourcePath()); + assertEquals("read", result.getVXAccessAudits().get(0).getAccessType()); + } + + @Test + void searchXAccessAudits_emptyResults() throws Exception { + when(openSearchMgr.getClient()).thenReturn(restClient); + openSearchMgr.index = "ranger_audits"; + + OpenSearchSearchResult mockResult = new OpenSearchSearchResult(0, new ArrayList<>()); + + when(openSearchUtil.searchResources(any(), any(), any(), eq(restClient), anyString())) + .thenReturn(mockResult); + + SearchCriteria criteria = new SearchCriteria(); + criteria.setMaxRows(25); + criteria.setStartIndex(0); + + VXAccessAuditList result = service.searchXAccessAudits(criteria); + + assertNotNull(result); + assertEquals(0, result.getTotalCount()); + assertEquals(0, result.getResultSize()); + assertTrue(result.getVXAccessAudits().isEmpty()); + } + + private static void assertTrue(boolean condition) { + org.junit.jupiter.api.Assertions.assertTrue(condition); + } +} diff --git a/security-admin/src/test/java/org/apache/ranger/opensearch/OpenSearchMgrTest.java b/security-admin/src/test/java/org/apache/ranger/opensearch/OpenSearchMgrTest.java new file mode 100644 index 00000000000..0a4185bf447 --- /dev/null +++ b/security-admin/src/test/java/org/apache/ranger/opensearch/OpenSearchMgrTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.opensearch; + +import org.elasticsearch.client.RestClientBuilder; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class OpenSearchMgrTest { + @Test + void buildRestClientBuilder_basicAuth() { + RestClientBuilder builder = OpenSearchMgr.buildRestClientBuilder( + "localhost", "http", "admin", "password", 9200); + + assertNotNull(builder); + } + + @Test + void buildRestClientBuilder_noAuth() { + RestClientBuilder builder = OpenSearchMgr.buildRestClientBuilder( + "localhost", "http", "", "", 9200); + + assertNotNull(builder); + } + + @Test + void buildRestClientBuilder_noneCredentials() { + RestClientBuilder builder = OpenSearchMgr.buildRestClientBuilder( + "localhost", "https", "NONE", "NONE", 9200); + + assertNotNull(builder); + } + + @Test + void buildRestClientBuilder_multipleHosts() { + RestClientBuilder builder = OpenSearchMgr.buildRestClientBuilder( + "host1,host2,host3", "http", "user", "pass", 9200); + + assertNotNull(builder); + } +} diff --git a/security-admin/src/test/java/org/apache/ranger/opensearch/OpenSearchUtilTest.java b/security-admin/src/test/java/org/apache/ranger/opensearch/OpenSearchUtilTest.java new file mode 100644 index 00000000000..73b2680f3c6 --- /dev/null +++ b/security-admin/src/test/java/org/apache/ranger/opensearch/OpenSearchUtilTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.opensearch; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.ranger.common.SearchCriteria; +import org.apache.ranger.common.SearchField; +import org.apache.ranger.common.SortField; +import org.apache.ranger.common.StringUtil; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +class OpenSearchUtilTest { + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @InjectMocks + OpenSearchUtil openSearchUtil; + + @Mock + StringUtil stringUtil; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + when(stringUtil.isEmpty(anyString())).thenAnswer(inv -> { + String s = inv.getArgument(0); + return s == null || s.trim().isEmpty(); + }); + } + + @Test + void buildSearchBody_emptyParams() throws Exception { + SearchCriteria criteria = new SearchCriteria(); + criteria.setMaxRows(25); + criteria.setStartIndex(0); + + List searchFields = new ArrayList<>(); + List sortFields = List.of( + new SortField("eventTime", "evtTime", true, SortField.SORT_ORDER.DESC)); + + String body = openSearchUtil.buildSearchBody(criteria, searchFields, sortFields); + JsonNode root = MAPPER.readTree(body); + + assertNotNull(root.get("query")); + assertEquals(0, root.get("from").asInt()); + assertEquals(25, root.get("size").asInt()); + } + + @Test + void buildSearchBody_partialStringSearch() throws Exception { + SearchCriteria criteria = new SearchCriteria(); + criteria.setMaxRows(10); + criteria.setStartIndex(0); + criteria.addParam("requestUser", "testuser"); + + List searchFields = List.of( + new SearchField("requestUser", "reqUser", + SearchField.DATA_TYPE.STRING, SearchField.SEARCH_TYPE.PARTIAL)); + List sortFields = List.of( + new SortField("eventTime", "evtTime", true, SortField.SORT_ORDER.DESC)); + + String body = openSearchUtil.buildSearchBody(criteria, searchFields, sortFields); + JsonNode root = MAPPER.readTree(body); + + JsonNode mustClauses = root.at("/query/bool/must"); + assertTrue(mustClauses.isArray()); + assertTrue(mustClauses.size() > 0); + + String bodyStr = body.toLowerCase(); + assertTrue(bodyStr.contains("testuser")); + assertTrue(bodyStr.contains("query_string")); + } + + @Test + void buildSearchBody_fullStringMatch() throws Exception { + SearchCriteria criteria = new SearchCriteria(); + criteria.setMaxRows(10); + criteria.setStartIndex(0); + criteria.addParam("accessType", "read"); + + List searchFields = List.of( + new SearchField("accessType", "access", + SearchField.DATA_TYPE.STRING, SearchField.SEARCH_TYPE.FULL)); + List sortFields = List.of( + new SortField("eventTime", "evtTime", true, SortField.SORT_ORDER.DESC)); + + String body = openSearchUtil.buildSearchBody(criteria, searchFields, sortFields); + + assertTrue(body.contains("match_phrase")); + assertTrue(body.contains("access")); + assertTrue(body.contains("read")); + } + + @Test + void buildSearchBody_dateRange() throws Exception { + SearchCriteria criteria = new SearchCriteria(); + criteria.setMaxRows(10); + criteria.setStartIndex(0); + criteria.addParam("startDate", new Date(1700000000000L)); + criteria.addParam("endDate", new Date(1700100000000L)); + + List searchFields = List.of( + new SearchField("startDate", "evtTime", + SearchField.DATA_TYPE.DATE, SearchField.SEARCH_TYPE.GREATER_EQUAL_THAN), + new SearchField("endDate", "evtTime", + SearchField.DATA_TYPE.DATE, SearchField.SEARCH_TYPE.LESS_EQUAL_THAN)); + List sortFields = List.of( + new SortField("eventTime", "evtTime", true, SortField.SORT_ORDER.DESC)); + + String body = openSearchUtil.buildSearchBody(criteria, searchFields, sortFields); + + assertTrue(body.contains("range")); + assertTrue(body.contains("evtTime")); + assertTrue(body.contains("gte")); + assertTrue(body.contains("lte")); + } + + @Test + void buildSearchBody_collectionOrQuery() throws Exception { + SearchCriteria criteria = new SearchCriteria(); + criteria.setMaxRows(10); + criteria.setStartIndex(0); + criteria.addParam("requestUser", Arrays.asList("user1", "user2", "user3")); + + List searchFields = List.of( + new SearchField("requestUser", "reqUser", + SearchField.DATA_TYPE.STR_LIST, SearchField.SEARCH_TYPE.FULL)); + List sortFields = List.of( + new SortField("eventTime", "evtTime", true, SortField.SORT_ORDER.DESC)); + + String body = openSearchUtil.buildSearchBody(criteria, searchFields, sortFields); + + assertTrue(body.contains("query_string")); + assertTrue(body.contains("OR")); + assertTrue(body.contains("user1")); + assertTrue(body.contains("user2")); + assertTrue(body.contains("user3")); + } + + @Test + void buildSearchBody_negation() throws Exception { + SearchCriteria criteria = new SearchCriteria(); + criteria.setMaxRows(10); + criteria.setStartIndex(0); + criteria.addParam("excludeUser", "serviceuser"); + + List searchFields = List.of( + new SearchField("excludeUser", "-reqUser", + SearchField.DATA_TYPE.STRING, SearchField.SEARCH_TYPE.FULL)); + List sortFields = List.of( + new SortField("eventTime", "evtTime", true, SortField.SORT_ORDER.DESC)); + + String body = openSearchUtil.buildSearchBody(criteria, searchFields, sortFields); + + assertTrue(body.contains("must_not")); + assertTrue(body.contains("reqUser")); + } + + @Test + void buildSearchBody_sorting() throws Exception { + SearchCriteria criteria = new SearchCriteria(); + criteria.setMaxRows(10); + criteria.setStartIndex(5); + criteria.setSortBy("eventTime"); + criteria.setSortType("asc"); + + List searchFields = new ArrayList<>(); + List sortFields = List.of( + new SortField("eventTime", "evtTime", true, SortField.SORT_ORDER.DESC)); + + String body = openSearchUtil.buildSearchBody(criteria, searchFields, sortFields); + JsonNode root = MAPPER.readTree(body); + + assertEquals(5, root.get("from").asInt()); + assertEquals(10, root.get("size").asInt()); + + JsonNode sortNode = root.get("sort"); + assertNotNull(sortNode); + assertTrue(sortNode.isArray()); + assertTrue(sortNode.get(0).has("evtTime")); + assertEquals("asc", sortNode.get(0).at("/evtTime/order").asText()); + } + + @Test + void escapeLucene_specialCharacters() { + String input = "test+value:with*special?chars"; + String escaped = OpenSearchUtil.escapeLucene(input); + + assertTrue(escaped.contains("\\+")); + assertTrue(escaped.contains("\\:")); + assertTrue(escaped.contains("\\*")); + assertTrue(escaped.contains("\\?")); + } + + @Test + void escapeLucene_noSpecialChars() { + String input = "simplevalue"; + String escaped = OpenSearchUtil.escapeLucene(input); + + assertEquals("simplevalue", escaped); + } +}