diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt index 3790c3c0..8f798c32 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt @@ -11,6 +11,7 @@ package org.opensearch.replication +import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_TYPE import org.opensearch.replication.action.changes.GetChangesAction import org.opensearch.replication.action.changes.TransportGetChangesAction import org.opensearch.replication.action.index.ReplicateIndexAction @@ -41,8 +42,8 @@ import org.opensearch.replication.action.status.ReplicationStatusAction import org.opensearch.replication.action.status.ShardsInfoAction import org.opensearch.replication.action.status.TranportShardsInfoAction import org.opensearch.replication.action.status.TransportReplicationStatusAction -import org.opensearch.replication.action.stop.StopIndexReplicationAction import org.opensearch.replication.action.stop.TransportStopIndexReplicationAction +import org.opensearch.replication.action.stop.TransportUnfollowIndexReplicationAction import org.opensearch.replication.action.update.TransportUpdateIndexReplicationAction import org.opensearch.replication.action.update.UpdateIndexReplicationAction import org.opensearch.replication.metadata.ReplicationMetadataManager @@ -96,6 +97,7 @@ import org.opensearch.core.common.unit.ByteSizeUnit import org.opensearch.core.common.unit.ByteSizeValue import org.opensearch.common.unit.TimeValue import org.opensearch.common.util.concurrent.OpenSearchExecutors +import org.opensearch.commons.replication.action.ReplicationActions.UNFOLLOW_REPLICATION_ACTION_TYPE import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.XContentParser import org.opensearch.commons.utils.OpenForTesting @@ -233,7 +235,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, ActionHandler(GetFileChunkAction.INSTANCE, TransportGetFileChunkAction::class.java), ActionHandler(UpdateAutoFollowPatternAction.INSTANCE, TransportUpdateAutoFollowPatternAction::class.java), ActionHandler(AutoFollowClusterManagerNodeAction.INSTANCE, TransportAutoFollowClusterManagerNodeAction::class.java), - ActionHandler(StopIndexReplicationAction.INSTANCE, TransportStopIndexReplicationAction::class.java), + ActionHandler(STOP_REPLICATION_ACTION_TYPE, TransportStopIndexReplicationAction::class.java), + ActionHandler(UNFOLLOW_REPLICATION_ACTION_TYPE, TransportUnfollowIndexReplicationAction::class.java), ActionHandler(PauseIndexReplicationAction.INSTANCE, TransportPauseIndexReplicationAction::class.java), ActionHandler(ResumeIndexReplicationAction.INSTANCE, TransportResumeIndexReplicationAction::class.java), ActionHandler(UpdateIndexReplicationAction.INSTANCE, TransportUpdateIndexReplicationAction::class.java), diff --git a/src/main/kotlin/org/opensearch/replication/action/stop/StopIndexReplicationAction.kt b/src/main/kotlin/org/opensearch/replication/action/stop/StopIndexReplicationAction.kt deleted file mode 100644 index 412f139d..00000000 --- a/src/main/kotlin/org/opensearch/replication/action/stop/StopIndexReplicationAction.kt +++ /dev/null @@ -1,22 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - * - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.replication.action.stop - -import org.opensearch.action.ActionType -import org.opensearch.action.support.master.AcknowledgedResponse - -class StopIndexReplicationAction private constructor(): ActionType(NAME, ::AcknowledgedResponse) { - companion object { - const val NAME = "indices:admin/plugins/replication/index/stop" - val INSTANCE: StopIndexReplicationAction = StopIndexReplicationAction() - } -} diff --git a/src/main/kotlin/org/opensearch/replication/action/stop/StopIndexReplicationRequest.kt b/src/main/kotlin/org/opensearch/replication/action/stop/StopIndexReplicationRequest.kt deleted file mode 100644 index 3ae5eff9..00000000 --- a/src/main/kotlin/org/opensearch/replication/action/stop/StopIndexReplicationRequest.kt +++ /dev/null @@ -1,78 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - * - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.replication.action.stop - -import org.opensearch.action.ActionRequestValidationException -import org.opensearch.action.IndicesRequest -import org.opensearch.action.support.IndicesOptions -import org.opensearch.action.support.master.AcknowledgedRequest -import org.opensearch.core.ParseField -import org.opensearch.core.common.io.stream.StreamInput -import org.opensearch.core.common.io.stream.StreamOutput -import org.opensearch.core.xcontent.* - -class StopIndexReplicationRequest : AcknowledgedRequest, IndicesRequest.Replaceable, ToXContentObject { - - lateinit var indexName: String - - constructor(indexName: String) { - this.indexName = indexName - } - - private constructor() { - } - - constructor(inp: StreamInput): super(inp) { - indexName = inp.readString() - } - - companion object { - private val PARSER = ObjectParser("StopReplicationRequestParser") { - StopIndexReplicationRequest() - } - - fun fromXContent(parser: XContentParser, followerIndex: String): StopIndexReplicationRequest { - val stopIndexReplicationRequest = PARSER.parse(parser, null) - stopIndexReplicationRequest.indexName = followerIndex - return stopIndexReplicationRequest - } - } - - override fun validate(): ActionRequestValidationException? { - return null - } - - override fun indices(vararg indices: String?): IndicesRequest { - return this - } - - override fun indices(): Array { - return arrayOf(indexName) - } - - override fun indicesOptions(): IndicesOptions { - return IndicesOptions.strictSingleIndexNoExpandForbidClosed() - } - - override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - builder.startObject() - builder.field("indexName", indexName) - builder.endObject() - return builder - } - - override fun writeTo(out: StreamOutput) { - super.writeTo(out) - out.writeString(indexName) - } - -} \ No newline at end of file diff --git a/src/main/kotlin/org/opensearch/replication/action/stop/TransportStopIndexReplicationAction.kt b/src/main/kotlin/org/opensearch/replication/action/stop/TransportStopIndexReplicationAction.kt index d9af050a..8de3be6d 100644 --- a/src/main/kotlin/org/opensearch/replication/action/stop/TransportStopIndexReplicationAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/stop/TransportStopIndexReplicationAction.kt @@ -11,6 +11,8 @@ package org.opensearch.replication.action.stop +import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_NAME +import org.opensearch.commons.replication.action.StopIndexReplicationRequest import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATED_INDEX_SETTING import org.opensearch.replication.action.index.block.IndexBlockUpdateType import org.opensearch.replication.action.index.block.UpdateIndexBlockAction @@ -68,7 +70,7 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService: IndexNameExpressionResolver, val client: Client, val replicationMetadataManager: ReplicationMetadataManager) : - TransportMasterNodeAction (StopIndexReplicationAction.NAME, + TransportMasterNodeAction (STOP_REPLICATION_ACTION_NAME, transportService, clusterService, threadPool, actionFilters, ::StopIndexReplicationRequest, indexNameExpressionResolver), CoroutineScope by GlobalScope { diff --git a/src/main/kotlin/org/opensearch/replication/action/stop/TransportUnfollowIndexReplicationAction.kt b/src/main/kotlin/org/opensearch/replication/action/stop/TransportUnfollowIndexReplicationAction.kt new file mode 100755 index 00000000..2ce2f4f5 --- /dev/null +++ b/src/main/kotlin/org/opensearch/replication/action/stop/TransportUnfollowIndexReplicationAction.kt @@ -0,0 +1,68 @@ +package org.opensearch.replication.action.stop + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.ActionRequest +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.Client +import org.opensearch.common.inject.Inject +import org.opensearch.cluster.metadata.IndexNameExpressionResolver +import org.opensearch.cluster.service.ClusterService +import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_TYPE +import org.opensearch.commons.replication.action.StopIndexReplicationRequest +import org.opensearch.commons.replication.action.ReplicationActions.UNFOLLOW_REPLICATION_ACTION_NAME +import org.opensearch.commons.utils.recreateObject +import org.opensearch.core.action.ActionListener +import org.opensearch.replication.metadata.ReplicationMetadataManager +import org.opensearch.replication.util.coroutineContext +import org.opensearch.replication.util.stackTraceToString +import org.opensearch.replication.util.suspendExecute +import org.opensearch.tasks.Task +import org.opensearch.threadpool.ThreadPool +import org.opensearch.transport.TransportService + + +/** + * This action transforms the request from ActionRequest type to StopIndexReplicationRequest + * and performs the TransportStopIndexReplicationAction on it. + * While TransportStopIndexReplicationAction is used directly by the _stop replication REST API, + * this action is used for inter-plugin communication by ism plugin to unfollow i.e. stop replication. + */ +class TransportUnfollowIndexReplicationAction @Inject constructor ( + val name: String, + val transportService: TransportService, + val clusterService: ClusterService, + val threadPool: ThreadPool, + val client: Client, + val actionFilters: ActionFilters, + val indexNameExpressionResolver: IndexNameExpressionResolver, + val replicationMetadataManager: ReplicationMetadataManager, +): HandledTransportAction (UNFOLLOW_REPLICATION_ACTION_NAME, transportService, actionFilters, ::StopIndexReplicationRequest), + CoroutineScope by GlobalScope { + companion object { + private val log = LogManager.getLogger(TransportUnfollowIndexReplicationAction::class.java) + } + + @Throws(Exception::class) + override fun doExecute(task: Task?, request: ActionRequest, listener: ActionListener?) { + launch(Dispatchers.Unconfined + threadPool.coroutineContext()) { + val transformedRequest = request as? StopIndexReplicationRequest + ?: request.let { recreateObject(it) { StopIndexReplicationRequest(it) } } + try { + + var response = client.suspendExecute(STOP_REPLICATION_ACTION_TYPE, transformedRequest, true) + log.info("Stop replication successful for index[${transformedRequest.indexName}] with response: " + response.isAcknowledged) + listener?.onResponse(AcknowledgedResponse(true)) + } catch (e: Exception) { + log.error("Stop replication failed for index[${transformedRequest.indexName}] with error ${e.stackTraceToString()}") + listener?.onFailure(e) + throw e + } + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/opensearch/replication/rest/StopIndexReplicationHandler.kt b/src/main/kotlin/org/opensearch/replication/rest/StopIndexReplicationHandler.kt index 25406217..0ad033b6 100644 --- a/src/main/kotlin/org/opensearch/replication/rest/StopIndexReplicationHandler.kt +++ b/src/main/kotlin/org/opensearch/replication/rest/StopIndexReplicationHandler.kt @@ -11,8 +11,8 @@ package org.opensearch.replication.rest -import org.opensearch.replication.action.stop.StopIndexReplicationAction -import org.opensearch.replication.action.stop.StopIndexReplicationRequest +import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_TYPE +import org.opensearch.commons.replication.action.StopIndexReplicationRequest import org.opensearch.client.node.NodeClient import org.opensearch.rest.BaseRestHandler import org.opensearch.rest.RestChannel @@ -38,7 +38,7 @@ class StopIndexReplicationHandler : BaseRestHandler() { val stopReplicationRequest = StopIndexReplicationRequest.fromXContent(parser, followIndex) return RestChannelConsumer { channel: RestChannel? -> client.admin().cluster() - .execute(StopIndexReplicationAction.INSTANCE, stopReplicationRequest, RestToXContentListener(channel)) + .execute(STOP_REPLICATION_ACTION_TYPE, stopReplicationRequest, RestToXContentListener(channel)) } } } diff --git a/src/main/kotlin/org/opensearch/replication/util/SecurityContext.kt b/src/main/kotlin/org/opensearch/replication/util/SecurityContext.kt index f811324a..d4b54f5f 100644 --- a/src/main/kotlin/org/opensearch/replication/util/SecurityContext.kt +++ b/src/main/kotlin/org/opensearch/replication/util/SecurityContext.kt @@ -11,6 +11,8 @@ package org.opensearch.replication.util +import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_NAME +import org.opensearch.commons.replication.action.ReplicationActions.UNFOLLOW_REPLICATION_ACTION_NAME import org.opensearch.replication.action.autofollow.UpdateAutoFollowPatternAction import org.opensearch.replication.action.changes.GetChangesAction import org.opensearch.replication.action.index.ReplicateIndexAction @@ -20,7 +22,6 @@ import org.opensearch.replication.action.repository.GetFileChunkAction import org.opensearch.replication.action.repository.GetStoreMetadataAction import org.opensearch.replication.action.resume.ResumeIndexReplicationAction import org.opensearch.replication.action.status.ReplicationStatusAction -import org.opensearch.replication.action.stop.StopIndexReplicationAction import org.opensearch.replication.action.update.UpdateIndexReplicationAction import org.opensearch.replication.metadata.ReplicationMetadataManager import org.opensearch.replication.metadata.store.ReplicationMetadata @@ -49,7 +50,7 @@ class SecurityContext { val LEADER_USER_ACTIONS = listOf(GetChangesAction.NAME, GetFileChunkAction.NAME) val FOLLOWER_USER_ACTIONS = listOf(ReplayChangesAction.NAME, ReplicateIndexAction.NAME, PauseIndexReplicationAction.NAME, - ResumeIndexReplicationAction.NAME, StopIndexReplicationAction.NAME, + ResumeIndexReplicationAction.NAME, STOP_REPLICATION_ACTION_NAME, UNFOLLOW_REPLICATION_ACTION_NAME, UpdateIndexReplicationAction.NAME, ReplicationStatusAction.NAME, UpdateAutoFollowPatternAction.NAME) diff --git a/src/test/kotlin/org/opensearch/replication/MultiClusterRestTestCase.kt b/src/test/kotlin/org/opensearch/replication/MultiClusterRestTestCase.kt index 185cadc4..57b54758 100644 --- a/src/test/kotlin/org/opensearch/replication/MultiClusterRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/replication/MultiClusterRestTestCase.kt @@ -11,6 +11,7 @@ package org.opensearch.replication +import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_NAME import org.opensearch.replication.MultiClusterAnnotations.ClusterConfiguration import org.opensearch.replication.MultiClusterAnnotations.ClusterConfigurations import org.opensearch.replication.MultiClusterAnnotations.getAnnotationsFromClass @@ -329,7 +330,7 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() { "indices:admin/plugins/replication/index/start", "indices:admin/plugins/replication/index/pause", "indices:admin/plugins/replication/index/resume", - "indices:admin/plugins/replication/index/stop", + "$STOP_REPLICATION_ACTION_NAME", "indices:admin/plugins/replication/index/update", "indices:admin/plugins/replication/index/status_check" ] diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityBase.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityBase.kt index 515893e2..b370cc43 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityBase.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityBase.kt @@ -17,6 +17,7 @@ import org.apache.hc.core5.http.ContentType import org.apache.hc.core5.http.io.entity.StringEntity import org.opensearch.client.Request import org.junit.BeforeClass +import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_NAME const val INTEG_TEST_PASSWORD = "ccr-integ-test@123" @@ -74,7 +75,7 @@ abstract class SecurityBase : MultiClusterRestTestCase() { "indices:admin/plugins/replication/index/start", "indices:admin/plugins/replication/index/pause", "indices:admin/plugins/replication/index/resume", - "indices:admin/plugins/replication/index/stop", + "$STOP_REPLICATION_ACTION_NAME", "indices:admin/plugins/replication/index/update", "indices:admin/plugins/replication/index/status_check" ] @@ -106,7 +107,7 @@ abstract class SecurityBase : MultiClusterRestTestCase() { "indices:admin/plugins/replication/index/start", "indices:admin/plugins/replication/index/pause", "indices:admin/plugins/replication/index/resume", - "indices:admin/plugins/replication/index/stop", + "$STOP_REPLICATION_ACTION_NAME", "indices:admin/plugins/replication/index/update", "indices:admin/plugins/replication/index/status_check" ] @@ -119,7 +120,7 @@ abstract class SecurityBase : MultiClusterRestTestCase() { "indices:admin/plugins/replication/index/start", "indices:admin/plugins/replication/index/pause", "indices:admin/plugins/replication/index/resume", - "indices:admin/plugins/replication/index/stop", + "$STOP_REPLICATION_ACTION_NAME", "indices:admin/plugins/replication/index/update", "indices:admin/plugins/replication/index/status_check" ] @@ -151,7 +152,7 @@ abstract class SecurityBase : MultiClusterRestTestCase() { "indices:admin/plugins/replication/index/start", "indices:admin/plugins/replication/index/pause", "indices:admin/plugins/replication/index/resume", - "indices:admin/plugins/replication/index/stop", + "$STOP_REPLICATION_ACTION_NAME", "indices:admin/plugins/replication/index/update", "indices:admin/plugins/replication/index/status_check" ] @@ -183,7 +184,7 @@ abstract class SecurityBase : MultiClusterRestTestCase() { "indices:admin/plugins/replication/index/start", "indices:admin/plugins/replication/index/pause", "indices:admin/plugins/replication/index/resume", - "indices:admin/plugins/replication/index/stop", + "$STOP_REPLICATION_ACTION_NAME", "indices:admin/plugins/replication/index/update", "indices:admin/plugins/replication/index/status_check" ] @@ -257,7 +258,7 @@ abstract class SecurityBase : MultiClusterRestTestCase() { "indices:admin/plugins/replication/index/start", "indices:admin/plugins/replication/index/pause", "indices:admin/plugins/replication/index/resume", - "indices:admin/plugins/replication/index/stop", + "$STOP_REPLICATION_ACTION_NAME", "indices:admin/plugins/replication/index/update", "indices:admin/plugins/replication/index/status_check" ] diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesIT.kt index bdb7e44d..db9ce5a1 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesIT.kt @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit import org.opensearch.replication.task.autofollow.AutoFollowExecutor import org.opensearch.tasks.TaskInfo import org.junit.Before +import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_NAME @MultiClusterAnnotations.ClusterConfigurations( MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER), @@ -413,7 +414,7 @@ class SecurityCustomRolesIT: SecurityBase() { "indices:admin/plugins/replication/index/start", "indices:admin/plugins/replication/index/pause", "indices:admin/plugins/replication/index/resume", - "indices:admin/plugins/replication/index/stop", + "$STOP_REPLICATION_ACTION_NAME", "indices:admin/plugins/replication/index/update", "indices:admin/plugins/replication/index/status_check" ] diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityDlsFlsIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityDlsFlsIT.kt index b8ababe8..145c5893 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityDlsFlsIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityDlsFlsIT.kt @@ -1,4 +1,4 @@ -/* +/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to