Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <[email protected]>
  • Loading branch information
ruai0511 committed Oct 21, 2024
1 parent 456ca97 commit c0bc9bd
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.common.UUIDs;
import org.opensearch.core.common.io.stream.StreamInput;
Expand All @@ -33,7 +34,7 @@
*
* @opensearch.experimental
*/
public class CreateQueryGroupRequest extends ActionRequest {
public class CreateQueryGroupRequest extends ClusterManagerNodeRequest<CreateQueryGroupRequest> {
private final QueryGroup queryGroup;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,83 @@

import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;

/**
* Transport action to create QueryGroup
*
* @opensearch.experimental
*/
public class TransportCreateQueryGroupAction extends HandledTransportAction<CreateQueryGroupRequest, CreateQueryGroupResponse> {
public class TransportCreateQueryGroupAction extends TransportClusterManagerNodeAction<CreateQueryGroupRequest, CreateQueryGroupResponse> {

private final QueryGroupPersistenceService queryGroupPersistenceService;

/**
* Constructor for TransportCreateQueryGroupAction
*
* @param actionName - action name
* @param clusterService - a {@link ClusterService} object
* @param transportService - a {@link TransportService} object
* @param actionFilters - a {@link ActionFilters} object
* @param threadPool - a {@link ThreadPool} object
* @param indexNameExpressionResolver - a {@link IndexNameExpressionResolver} object
* @param queryGroupPersistenceService - a {@link QueryGroupPersistenceService} object
*/
@Inject
public TransportCreateQueryGroupAction(
String actionName,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver,
QueryGroupPersistenceService queryGroupPersistenceService
) {
super(CreateQueryGroupAction.NAME, transportService, actionFilters, CreateQueryGroupRequest::new);
super(
CreateQueryGroupAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
CreateQueryGroupRequest::new,
indexNameExpressionResolver
);
this.queryGroupPersistenceService = queryGroupPersistenceService;
}

@Override
protected void doExecute(Task task, CreateQueryGroupRequest request, ActionListener<CreateQueryGroupResponse> listener) {
protected void clusterManagerOperation(
CreateQueryGroupRequest request,
ClusterState state,
ActionListener<CreateQueryGroupResponse> listener
) throws Exception {
queryGroupPersistenceService.persistInClusterStateMetadata(request.getQueryGroup(), listener);
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected CreateQueryGroupResponse read(StreamInput in) throws IOException {
return new CreateQueryGroupResponse(in);
}

@Override
protected ClusterBlockException checkBlock(CreateQueryGroupRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,84 @@

import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;

/**
* Transport action to update QueryGroup
*
* @opensearch.experimental
*/
public class TransportUpdateQueryGroupAction extends HandledTransportAction<UpdateQueryGroupRequest, UpdateQueryGroupResponse> {
public class TransportUpdateQueryGroupAction extends TransportClusterManagerNodeAction<UpdateQueryGroupRequest, UpdateQueryGroupResponse> {

private final QueryGroupPersistenceService queryGroupPersistenceService;

/**
* Constructor for TransportUpdateQueryGroupAction
*
* @param actionName - action name
* @param clusterService - a {@link ClusterService} object
* @param transportService - a {@link TransportService} object
* @param actionFilters - a {@link ActionFilters} object
* @param threadPool - a {@link ThreadPool} object
* @param indexNameExpressionResolver - a {@link IndexNameExpressionResolver} object
* @param queryGroupPersistenceService - a {@link QueryGroupPersistenceService} object
*/
@Inject
public TransportUpdateQueryGroupAction(
String actionName,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver,
QueryGroupPersistenceService queryGroupPersistenceService
) {
super(UpdateQueryGroupAction.NAME, transportService, actionFilters, UpdateQueryGroupRequest::new);
super(
UpdateQueryGroupAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
UpdateQueryGroupRequest::new,
indexNameExpressionResolver
);
this.queryGroupPersistenceService = queryGroupPersistenceService;
}

@Override
protected void doExecute(Task task, UpdateQueryGroupRequest request, ActionListener<UpdateQueryGroupResponse> listener) {
protected void clusterManagerOperation(
UpdateQueryGroupRequest request,
ClusterState state,
ActionListener<UpdateQueryGroupResponse> listener
) throws Exception {
queryGroupPersistenceService.updateInClusterStateMetadata(request, listener);
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected UpdateQueryGroupResponse read(StreamInput in) throws IOException {
return new UpdateQueryGroupResponse(in);
}

@Override
protected ClusterBlockException checkBlock(UpdateQueryGroupRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand All @@ -23,7 +24,7 @@
*
* @opensearch.experimental
*/
public class UpdateQueryGroupRequest extends ActionRequest {
public class UpdateQueryGroupRequest extends ClusterManagerNodeRequest<UpdateQueryGroupRequest> {
private final String name;
private final MutableQueryGroupFragment mutableQueryGroupFragment;

Expand Down

0 comments on commit c0bc9bd

Please sign in to comment.