Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions chaoscenter/graphql/server/graph/chaos_infrastructure.resolvers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 47 additions & 23 deletions chaoscenter/graphql/server/pkg/chaos_experiment/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1237,17 +1237,25 @@ func (c *ChaosExperimentHandler) GetLogs(reqID string, pod model.PodLogRequest,
ExternalData: &externalData,
},
}
if clusterChan, ok := r.ConnectedInfra[pod.InfraID]; ok {
r.Mutex.Lock()
clusterChan, connected := r.ConnectedInfra[pod.InfraID]
r.Mutex.Unlock()
if connected {
clusterChan <- &payload
} else if reqChan, ok := r.ExperimentLog[reqID]; ok {
resp := model.PodLogResponse{
PodName: pod.PodName,
ExperimentRunID: pod.ExperimentRunID,
PodType: pod.PodType,
Log: "INFRA ERROR : INFRA NOT CONNECTED",
} else {
r.Mutex.Lock()
reqChan, ok := r.ExperimentLog[reqID]
r.Mutex.Unlock()
if ok {
resp := model.PodLogResponse{
PodName: pod.PodName,
ExperimentRunID: pod.ExperimentRunID,
PodType: pod.PodType,
Log: "INFRA ERROR : INFRA NOT CONNECTED",
}
reqChan <- &resp
close(reqChan)
}
reqChan <- &resp
close(reqChan)
}
}

Expand All @@ -1265,15 +1273,23 @@ func (c *ChaosExperimentHandler) GetKubeObjData(reqID string, kubeObject model.K
ExternalData: &externalData,
},
}
if clusterChan, ok := r.ConnectedInfra[kubeObject.InfraID]; ok {
r.Mutex.Lock()
clusterChan, connected := r.ConnectedInfra[kubeObject.InfraID]
r.Mutex.Unlock()
if connected {
clusterChan <- &payload
} else if reqChan, ok := r.KubeObjectData[reqID]; ok {
resp := model.KubeObjectResponse{
InfraID: kubeObject.InfraID,
KubeObj: &model.KubeObject{},
} else {
r.Mutex.Lock()
reqChan, ok := r.KubeObjectData[reqID]
r.Mutex.Unlock()
if ok {
resp := model.KubeObjectResponse{
InfraID: kubeObject.InfraID,
KubeObj: &model.KubeObject{},
}
reqChan <- &resp
close(reqChan)
}
reqChan <- &resp
close(reqChan)
}
}

Expand All @@ -1291,15 +1307,23 @@ func (c *ChaosExperimentHandler) GetKubeNamespaceData(reqID string, kubeNamespac
ExternalData: &externalData,
},
}
if clusterChan, ok := r.ConnectedInfra[kubeNamespace.InfraID]; ok {
r.Mutex.Lock()
clusterChan, connected := r.ConnectedInfra[kubeNamespace.InfraID]
r.Mutex.Unlock()
if connected {
clusterChan <- &payload
} else if reqChan, ok := r.KubeNamespaceData[reqID]; ok {
resp := model.KubeNamespaceResponse{
InfraID: kubeNamespace.InfraID,
KubeNamespace: []*model.KubeNamespace{},
} else {
r.Mutex.Lock()
reqChan, ok := r.KubeNamespaceData[reqID]
r.Mutex.Unlock()
if ok {
resp := model.KubeNamespaceResponse{
InfraID: kubeNamespace.InfraID,
KubeNamespace: []*model.KubeNamespace{},
}
reqChan <- &resp
close(reqChan)
}
reqChan <- &resp
close(reqChan)
}
}

Expand Down
29 changes: 22 additions & 7 deletions chaoscenter/graphql/server/pkg/chaos_infrastructure/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,10 @@ func (in *infraService) PodLog(request model.PodLog, r store.StateData) (string,
log.Print("ERROR", err)
return "", err
}
if reqChan, ok := r.ExperimentLog[request.RequestID]; ok {
r.Mutex.Lock()
reqChan, ok := r.ExperimentLog[request.RequestID]
r.Mutex.Unlock()
if ok {
resp := model.PodLogResponse{
PodName: request.PodName,
ExperimentRunID: request.ExperimentRunID,
Expand All @@ -995,7 +998,10 @@ func (in *infraService) KubeObj(request model.KubeObjectData, r store.StateData)
log.Print("Error", err)
return "", err
}
if reqChan, ok := r.KubeObjectData[request.RequestID]; ok {
r.Mutex.Lock()
reqChan, ok := r.KubeObjectData[request.RequestID]
r.Mutex.Unlock()
if ok {
var kubeObjData *model.KubeObject
err = json.Unmarshal([]byte(request.KubeObj), &kubeObjData)
if err != nil {
Expand All @@ -1020,7 +1026,10 @@ func (in *infraService) KubeNamespace(request model.KubeNamespaceData, r store.S
log.Print("Error", err)
return "", err
}
if reqChan, ok := r.KubeNamespaceData[request.RequestID]; ok {
r.Mutex.Lock()
reqChan, ok := r.KubeNamespaceData[request.RequestID]
r.Mutex.Unlock()
if ok {
var kubeNamespaceData []*model.KubeNamespace
err = json.Unmarshal([]byte(request.KubeNamespace), &kubeNamespaceData)
if err != nil {
Expand Down Expand Up @@ -1048,12 +1057,18 @@ func (in *infraService) SendInfraEvent(eventType, eventName, description string,
Infra: &infra,
}
r.Mutex.Lock()
if r.InfraEventPublish != nil {
for _, observer := range r.InfraEventPublish[infra.ProjectID] {
observer <- &newEvent
observers := make([]chan *model.InfraEventResponse, len(r.InfraEventPublish[infra.ProjectID]))
copy(observers, r.InfraEventPublish[infra.ProjectID])
r.Mutex.Unlock()

for _, observer := range observers {
// Use non-blocking send to prevent deadlock if channel buffer is full
select {
case observer <- &newEvent:
default:
// Channel full or no receiver, skip to prevent blocking
}
Comment on lines 1059 to 1070
}
r.Mutex.Unlock()
}

// ConfirmInfraRegistration takes the cluster_id and access_key from the subscriber and validates it, if validated generates and sends new access_key
Expand Down
Loading