From 443849bafef3ea4906c4672b782c580e8e758c13 Mon Sep 17 00:00:00 2001 From: changzhen Date: Tue, 27 Feb 2024 20:44:01 +0800 Subject: [PATCH] watch cluster status remedyActions field with mci controller Signed-off-by: changzhen --- .../multiclusteringress/eventhandlers.go | 74 +++++++++++++++++++ .../multiclusteringress/mci_controller.go | 67 +++++++++++++++++ 2 files changed, 141 insertions(+) diff --git a/pkg/controllers/multiclusteringress/eventhandlers.go b/pkg/controllers/multiclusteringress/eventhandlers.go index e5ebbd1..f6a3673 100644 --- a/pkg/controllers/multiclusteringress/eventhandlers.go +++ b/pkg/controllers/multiclusteringress/eventhandlers.go @@ -4,7 +4,9 @@ import ( "context" "strings" + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1" + remedyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/remedy/v1alpha1" corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -241,3 +243,75 @@ func (h *secretEventHandler) enqueueImpactedMCI(secretNamespace, secretName stri func (h *secretEventHandler) Generic(_ context.Context, _ event.GenericEvent, _ workqueue.RateLimitingInterface) { } + +func newClusterEventHandler(mciEventChan chan<- event.GenericEvent, client client.Client) handler.EventHandler { + return &clusterEventHandler{ + client: client, + mciEventChan: mciEventChan, + } +} + +var _ handler.EventHandler = (*clusterEventHandler)(nil) + +type clusterEventHandler struct { + client client.Client + mciEventChan chan<- event.GenericEvent +} + +func (h *clusterEventHandler) Create(_ context.Context, _ event.CreateEvent, _ workqueue.RateLimitingInterface) { +} + +func (h *clusterEventHandler) Update(_ context.Context, e event.UpdateEvent, _ workqueue.RateLimitingInterface) { + oldCluster := e.ObjectOld.(*clusterv1alpha1.Cluster) + newCluster := e.ObjectNew.(*clusterv1alpha1.Cluster) + oldExist, newExist := false, false + for _, action := range oldCluster.Status.RemedyActions { + if action == string(remedyv1alpha1.TrafficControl) { + oldExist = true + break + } + } + for _, action := range newCluster.Status.RemedyActions { + if action == string(remedyv1alpha1.TrafficControl) { + newExist = true + break + } + } + + if oldExist == newExist { + return + } + + mciList := &networkingv1alpha1.MultiClusterIngressList{} + if err := h.client.List(context.Background(), mciList); err != nil { + klog.Errorf("failed to fetch multiclusteringresses") + return + } + + for index := range mciList.Items { + mci := &mciList.Items[index] + if !mciSvcLocationsContainsCluster(mci, newCluster) { + continue + } + h.mciEventChan <- event.GenericEvent{ + Object: mci, + } + } +} + +func (h *clusterEventHandler) Delete(_ context.Context, _ event.DeleteEvent, _ workqueue.RateLimitingInterface) { +} + +func (h *clusterEventHandler) Generic(_ context.Context, _ event.GenericEvent, _ workqueue.RateLimitingInterface) { +} + +func mciSvcLocationsContainsCluster(mci *networkingv1alpha1.MultiClusterIngress, cluster *clusterv1alpha1.Cluster) bool { + for _, location := range mci.Status.ServiceLocations { + for _, clusterName := range location.Clusters { + if clusterName == cluster.Name { + return true + } + } + } + return false +} diff --git a/pkg/controllers/multiclusteringress/mci_controller.go b/pkg/controllers/multiclusteringress/mci_controller.go index da1cf51..90fd208 100644 --- a/pkg/controllers/multiclusteringress/mci_controller.go +++ b/pkg/controllers/multiclusteringress/mci_controller.go @@ -3,14 +3,19 @@ package multiclusteringress import ( "context" "reflect" + "sort" + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1" + remedyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/remedy/v1alpha1" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" @@ -95,6 +100,10 @@ func (c *MCIController) handleMCICreateOrUpdate(ctx context.Context, mci *networ } } + if err := c.updateMCITrafficBlockClusters(ctx, mci); err != nil { + return controllerruntime.Result{}, err + } + _, exist, err := c.LoadBalancer.GetLoadBalancer(ctx, mci) if err != nil { klog.ErrorS(err, "failed to get loadBalancer with provider", "namespace", mci.Namespace, "name", mci.Name) @@ -106,6 +115,60 @@ func (c *MCIController) handleMCICreateOrUpdate(ctx context.Context, mci *networ return c.handleMCICreate(ctx, mci) } +func (c *MCIController) updateMCITrafficBlockClusters(ctx context.Context, mci *networkingv1alpha1.MultiClusterIngress) error { + locatedClusters := sets.NewString() + for _, location := range mci.Status.ServiceLocations { + locatedClusters.Insert(location.Clusters...) + } + + clusterList := &clusterv1alpha1.ClusterList{} + if err := c.Client.List(ctx, clusterList); err != nil { + klog.Errorf("Failed to list cluster: %v", err) + return err + } + + var trafficBlockClusters []string + for _, cluster := range clusterList.Items { + if !locatedClusters.Has(cluster.Name) { + continue + } + for _, action := range cluster.Status.RemedyActions { + if action == string(remedyv1alpha1.TrafficControl) { + trafficBlockClusters = append(trafficBlockClusters, cluster.Name) + break + } + } + } + sort.Strings(trafficBlockClusters) + + mciNamespacedName := types.NamespacedName{Namespace: mci.Namespace, Name: mci.Name} + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if reflect.DeepEqual(trafficBlockClusters, mci.Status.TrafficBlockClusters) { + return nil + } + mci.Status.TrafficBlockClusters = trafficBlockClusters + updateErr := c.Client.Status().Update(ctx, mci) + if updateErr == nil { + return nil + } + + updatedMCI := &networkingv1alpha1.MultiClusterIngress{} + err := c.Client.Get(ctx, mciNamespacedName, updatedMCI) + if err == nil { + mci = updatedMCI.DeepCopy() + } else { + klog.Errorf("Failed to get updated multiClusterIngress(%s): %v", mciNamespacedName.String(), err) + } + return updateErr + }) + if err != nil { + klog.Errorf("Failed to sync multiClusterIngress(%s) trafficBlockClusters: %v", mciNamespacedName.String(), err) + return err + } + klog.V(4).Infof("Success to sync multiClusterIngress(%s) trafficBlockClusters", mciNamespacedName.String()) + return nil +} + func (c *MCIController) handleMCICreate(ctx context.Context, mci *networkingv1alpha1.MultiClusterIngress) (controllerruntime.Result, error) { klog.V(4).InfoS("Begin to handle multiClusterIngress create event", "namespace", mci.Namespace, "name", mci.Name) @@ -190,6 +253,7 @@ func (c *MCIController) setupWatches(ctx context.Context, mciController controll svcEventHandler := newServiceEventHandler(mciEventChan, c.Client) epsEventHandler := newEndpointSlicesEventHandler(svcEventChan) secEventHandler := newSecretEventHandler(mciEventChan, c.Client) + clusterHandler := newClusterEventHandler(mciEventChan, c.Client) if err := mciController.Watch(source.Kind(mgr.GetCache(), &networkingv1alpha1.MultiClusterIngress{}), mciEventHandler); err != nil { return err @@ -209,5 +273,8 @@ func (c *MCIController) setupWatches(ctx context.Context, mciController controll if err := mciController.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), secEventHandler); err != nil { return err } + if err := mciController.Watch(source.Kind(mgr.GetCache(), &clusterv1alpha1.Cluster{}), clusterHandler); err != nil { + return err + } return nil }