Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for dynamic renaming of keys #5074

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ public String toString() {
}

private String checkAndTrimKey(final String key) {
checkKey(key);
if(!supportedActions.equals(Collections.singleton(EventKeyFactory.EventAction.DELETE)))
{
checkKey(key);
}
return trimTrailingSlashInKey(key);
}

Expand Down Expand Up @@ -161,7 +164,8 @@ private static boolean isValidKey(final String key) {
|| c == '@'
|| c == '/'
|| c == '['
|| c == ']')) {
|| c == ']'
)) {

return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,6 @@ public void testKey_withNullKey_throwsNullPointerException() {
private <T extends Throwable> void assertThrowsForKeyCheck(final Class<T> expectedThrowable, final String key) {
assertThrows(expectedThrowable, () -> event.put(key, UUID.randomUUID()));
assertThrows(expectedThrowable, () -> event.get(key, String.class));
assertThrows(expectedThrowable, () -> event.delete(key));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;

@DataPrepperPlugin(name = "rename_keys", pluginType = Processor.class, pluginConfigurationType = RenameKeyProcessorConfig.class)
public class RenameKeyProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
Expand All @@ -44,6 +46,12 @@ public RenameKeyProcessor(final PluginMetrics pluginMetrics, final RenameKeyProc
String.format("rename_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
entry.getRenameWhen()));
}
if (entry.getFromKey() == null && entry.getFromKeyPattern() == null) {
throw new InvalidPluginConfigurationException("Either from_key or from_key_pattern must be specified. Both cannot be set together.");
}
if (entry.getFromKey() != null && entry.getFromKeyPattern() != null) {
throw new InvalidPluginConfigurationException("Only one of from_key or from_key_pattern should be specified.");
}
});
}

Expand All @@ -59,14 +67,29 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
continue;
}

if (entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey())) {
if (Objects.nonNull(entry.getFromKey()) && (entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey()))) {
continue;
}

if (!recordEvent.containsKey(entry.getToKey()) || entry.getOverwriteIfToKeyExists()) {
final Object source = recordEvent.get(entry.getFromKey(), Object.class);
recordEvent.put(entry.getToKey(), source);
recordEvent.delete(entry.getFromKey());
if(Objects.nonNull(entry.getFromKey())) {
final Object source = recordEvent.get(entry.getFromKey(), Object.class);
recordEvent.put(entry.getToKey(), source);
recordEvent.delete(entry.getFromKey());
}
if(Objects.nonNull(entry.getFromKeyCompiledPattern())) {
Map<String,Object> eventMap = recordEvent.toMap();
Pattern fromKeyCompiledPattern = entry.getFromKeyCompiledPattern();
for (Map.Entry<String, Object> eventEntry : eventMap.entrySet()) {
final String key = eventEntry.getKey();
final Object value = eventEntry.getValue();
if (fromKeyCompiledPattern.matcher(key).matches()) {
recordEvent.put(entry.getToKey(), value);
recordEvent.delete(key);
divbok marked this conversation as resolved.
Show resolved Hide resolved
if(!entry.getOverwriteIfToKeyExists()) break;

}
}
}
}
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonClassDescription;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
Expand All @@ -17,19 +18,22 @@
import org.opensearch.dataprepper.model.event.EventKeyFactory;

import java.util.List;
import java.util.regex.Pattern;

@JsonPropertyOrder
@JsonClassDescription("The <code>rename_keys</code> processor renames keys in an event.")
public class RenameKeyProcessorConfig {
@JsonPropertyOrder
public static class Entry {
@NotEmpty
@NotNull
@JsonProperty("from_key")
@JsonPropertyDescription("The key of the entry to be renamed.")
@EventKeyConfiguration({EventKeyFactory.EventAction.GET, EventKeyFactory.EventAction.DELETE})
private EventKey fromKey;

@JsonProperty("from_key_regex")
@JsonPropertyDescription("The regex pattern of the key of the entry to be renamed.")
private String fromKeyRegex;

@NotEmpty
@NotNull
@JsonProperty("to_key")
Expand All @@ -47,10 +51,16 @@ public static class Entry {
"run on the event. By default, all events will be processed unless otherwise stated.")
private String renameWhen;

private Pattern fromKeyCompiledPattern;

public EventKey getFromKey() {
return fromKey;
}

public String getFromKeyPattern() {
return fromKeyRegex;
}

public EventKey getToKey() {
return toKey;
}
Expand All @@ -59,10 +69,21 @@ public boolean getOverwriteIfToKeyExists() {
return overwriteIfToKeyExists;
}

public String getRenameWhen() { return renameWhen; }
public String getRenameWhen() {
return renameWhen;
}

@JsonIgnore
public Pattern getFromKeyCompiledPattern() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to add @JsonIgnore to this method.

if (fromKeyRegex != null && fromKeyCompiledPattern == null) {
fromKeyCompiledPattern = Pattern.compile(fromKeyRegex);
}
return fromKeyCompiledPattern;
}

public Entry(final EventKey fromKey, final EventKey toKey, final boolean overwriteIfKeyExists, final String renameWhen) {
public Entry(final EventKey fromKey, final String fromKeyPattern, final EventKey toKey, final boolean overwriteIfKeyExists, final String renameWhen) {
this.fromKey = fromKey;
this.fromKeyRegex = fromKeyPattern;
this.toKey = toKey;
this.overwriteIfToKeyExists = overwriteIfKeyExists;
this.renameWhen = renameWhen;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.LinkedList;
import java.util.Arrays;
import java.util.Map;
import java.util.ArrayList;
import java.util.HashMap;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
Expand All @@ -49,17 +51,28 @@ public class RenameKeyProcessorTests {
@Test
void invalid_rename_when_throws_InvalidPluginConfigurationException() {
final String renameWhen = UUID.randomUUID().toString();
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true, renameWhen)));
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", null,"newMessage", true, renameWhen)));


when(expressionEvaluator.isValidExpressionStatement(renameWhen)).thenReturn(false);

assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest);
}

@Test
void invalid_config_when_both_from_key_empty_throws_InvalidPluginConfigurationException() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null,null, "newMessage", true, null)));
assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest);
}

@Test
void invalid_config_when_both_from_key_set_throws_InvalidPluginConfigurationException() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message","m.*", "newMessage", true, null)));
assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest);
}
@Test
public void testSingleOverwriteRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true, null)));
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", null,"newMessage", true, null)));

final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
Expand All @@ -73,7 +86,7 @@ public void testSingleOverwriteRenameProcessorTests() {

@Test
public void testSingleNoOverwriteRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", false, null)));
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", null,"newMessage", false, null)));

final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
Expand All @@ -85,9 +98,55 @@ public void testSingleNoOverwriteRenameProcessorTests() {
assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo("test2"));
}

@Test
public void testFromKeyPatternNoOverwriteRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "(detailed_timestamp|detail_timestamp).*","detailed_timestamp", false, null)));

final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
record.getData().put("detailed_timestamp_1004", "test2");
final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));
assertThat(editedRecords.get(0).getData().containsKey("detailed_timestamp"), is(true));
assertThat(editedRecords.get(0).getData().containsKey("detailed_timestamp_1004"), is(false));
assertThat(editedRecords.get(0).getData().get("detailed_timestamp", Object.class), equalTo("test2"));
}
@Test
public void testFromKeyPatternGroupingPatternRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "(detailed_timestamp|detail_timestamp).*","detailed_timestamp", false, null)));
final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
record.getData().put("detailed_timestamp_1004", "test2");
divbok marked this conversation as resolved.
Show resolved Hide resolved
record.getData().put("test_key","test_value");
final Record<Event> second_record = getEvent("thisisanewmessage");
second_record.getData().put("detail_timestamp-123", "test3");
Collection<Record<Event>> records = new ArrayList<>();
records.add(record);
records.add(second_record);
final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(records);
assertThat(editedRecords.get(0).getData().containsKey("detailed_timestamp"), is(true));
assertThat(editedRecords.get(0).getData().containsKey("test_key"), is(true));
assertThat(editedRecords.get(1).getData().containsKey("detailed_timestamp"), is(true));
assertThat(editedRecords.get(0).getData().containsKey("detailed_timestamp_1004"), is(false));
assertThat(editedRecords.get(1).getData().containsKey("detail_timestamp-123"), is(false));
assertThat(editedRecords.get(0).getData().get("detailed_timestamp", Object.class), equalTo("test2"));
assertThat(editedRecords.get(1).getData().get("detailed_timestamp", Object.class), equalTo("test3"));
}
@Test
public void testFromKeyPatternOverwriteRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "me.*","newMessage", true, null)));

final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
record.getData().put("newMessage", "test2");
final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));
assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true));
assertThat(editedRecords.get(0).getData().containsKey("message"), is(false));
assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo("thisisamessage"));
}

@Test
public void testFromKeyDneRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message2", "newMessage", false, null)));
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message2",null, "newMessage", false, null)));

final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
Expand All @@ -100,8 +159,8 @@ public void testFromKeyDneRenameProcessorTests() {

@Test
public void testMultiMixedOverwriteRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true, null),
createEntry("message2", "existingMessage", false, null)));
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message",null, "newMessage", true, null),
createEntry("message2",null, "existingMessage", false, null)));

final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
Expand All @@ -118,8 +177,23 @@ public void testMultiMixedOverwriteRenameProcessorTests() {

@Test
public void testChainRenamingRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true, null),
createEntry("newMessage", "message3", true, null)));
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message",null, "newMessage", true, null),
createEntry("newMessage", null,"message3", true, null)));

final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertThat(editedRecords.get(0).getData().containsKey("message3"), is(true));
assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(false));
assertThat(editedRecords.get(0).getData().containsKey("message"), is(false));
assertThat(editedRecords.get(0).getData().get("message3", Object.class), equalTo("thisisamessage"));
}

@Test
public void testChainRenamingFromKeyPatternRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null,"me.*", "newMessage", true, null),
createEntry(null, "new.*","message3", true, null)));

final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
Expand All @@ -135,7 +209,7 @@ public void testChainRenamingRenameProcessorTests() {
public void testNoRename_when_RenameWhen_returns_false() {
final String renameWhen = UUID.randomUUID().toString();

when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", false, renameWhen)));
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message",null, "newMessage", false, renameWhen)));
when(expressionEvaluator.isValidExpressionStatement(renameWhen)).thenReturn(true);

final RenameKeyProcessor processor = createObjectUnderTest();
Expand All @@ -150,14 +224,16 @@ public void testNoRename_when_RenameWhen_returns_false() {
assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage"));
}



private RenameKeyProcessor createObjectUnderTest() {
return new RenameKeyProcessor(pluginMetrics, mockConfig, expressionEvaluator);
}

private RenameKeyProcessorConfig.Entry createEntry(final String fromKey, final String toKey, final boolean overwriteIfToKeyExists, final String renameWhen) {
final EventKey fromEventKey = eventKeyFactory.createEventKey(fromKey);
private RenameKeyProcessorConfig.Entry createEntry(final String fromKey, final String fromKeyPattern, final String toKey, final boolean overwriteIfToKeyExists, final String renameWhen) {
final EventKey fromEventKey = (fromKey == null) ? null : eventKeyFactory.createEventKey(fromKey);
final EventKey toEventKey = eventKeyFactory.createEventKey(toKey);
return new RenameKeyProcessorConfig.Entry(fromEventKey, toEventKey, overwriteIfToKeyExists, renameWhen);
return new RenameKeyProcessorConfig.Entry(fromEventKey,fromKeyPattern, toEventKey, overwriteIfToKeyExists, renameWhen);
}

private List<RenameKeyProcessorConfig.Entry> createListOfEntries(final RenameKeyProcessorConfig.Entry... entries) {
Expand Down
Loading