Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"context"
"encoding/json"
"strings"
"sync"

"github.com/litmuschaos/litmus/chaoscenter/event-tracker/pkg/utils"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/util/retry"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -62,48 +62,70 @@ type apiResponse struct {
func (r *EventTrackerPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)

var mutex = &sync.Mutex{}
mutex.Lock()
// Collect experiment IDs that need to be triggered after successful status update
var experimentsToTrigger []string

var etp eventtrackerv1.EventTrackerPolicy
err := r.Client.Get(context.Background(), req.NamespacedName, &etp)
if errors.IsNotFound(err) {
logrus.Infof("namespace: %s not found", req.NamespacedName)
return ctrl.Result{}, nil
} else if err != nil {
return ctrl.Result{}, err
}

for index, status := range etp.Statuses {
if status.Result == utils.ConditionPassed && strings.ToLower(status.IsTriggered) == "false" {
logrus.Print("ResourceName: " + status.ResourceName + ", ExperimentID: " + status.ExperimentID)
response, err := utils.SendRequest(status.ExperimentID)
if err != nil {
return ctrl.Result{}, err
}
// Phase 1: Atomically claim the trigger status using optimistic concurrency
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// Reset on each retry - we need fresh state
experimentsToTrigger = nil

logrus.Print(response)

var res apiResponse
err = json.Unmarshal([]byte(response), &res)
if err != nil {
return ctrl.Result{}, err
var etp eventtrackerv1.EventTrackerPolicy
if err := r.Client.Get(ctx, req.NamespacedName, &etp); err != nil {
if errors.IsNotFound(err) {
logrus.Infof("EventTrackerPolicy %s not found", req.NamespacedName)
return nil
}
return err
}

if res.Data.GitopsNotifer == "Gitops Disabled" {
etp.Statuses[index].IsTriggered = "false"
} else {
modified := false
for index, status := range etp.Statuses {
if status.Result == utils.ConditionPassed && strings.ToLower(status.IsTriggered) == "false" {
// Mark as triggered BEFORE sending request to prevent duplicate triggers
etp.Statuses[index].IsTriggered = "true"
experimentsToTrigger = append(experimentsToTrigger, status.ExperimentID)
modified = true
logrus.Printf("Claiming trigger for ResourceName: %s, ExperimentID: %s",
status.ResourceName, status.ExperimentID)
}
}
}

err = r.Client.Update(context.Background(), &etp)
if modified {
// Atomically update - if this fails due to conflict, retry will re-read
// and see IsTriggered="true" set by the winning reconcile
return r.Client.Update(ctx, &etp)
}
return nil
})

if err != nil {
return ctrl.Result{}, err
}

defer mutex.Unlock()
// Phase 2: Trigger experiments only after successfully claiming the status
// This runs outside the retry loop - experiments are only triggered once
for _, experimentID := range experimentsToTrigger {
logrus.Printf("Triggering ExperimentID: %s", experimentID)
response, err := utils.SendRequest(experimentID)
if err != nil {
logrus.WithError(err).Errorf("Failed to trigger experiment %s", experimentID)
// Continue with other experiments - don't fail the whole reconcile
continue
}

var res apiResponse
if err := json.Unmarshal([]byte(response), &res); err != nil {
logrus.WithError(err).Errorf("Failed to parse response for experiment %s", experimentID)
continue
}

if res.Data.GitopsNotifer == "Gitops Disabled" {
logrus.Infof("GitOps disabled for experiment %s", experimentID)
} else {
logrus.Infof("Successfully triggered experiment %s", experimentID)
}
}

return ctrl.Result{}, nil
}
Expand Down
Loading