From 17dd143344dc27c8d3258366048450859bf50f98 Mon Sep 17 00:00:00 2001 From: catpineapple Date: Wed, 25 Mar 2026 11:35:40 +0800 Subject: [PATCH] feat(ddc): split CG service into internal headless and external service For DDC ComputeGroups, separate the single service into two: - Internal headless service (original name) for cluster communication and StatefulSet DNS-based pod discovery - External service (with -external suffix) for load-balanced access This enables smooth operator upgrades: users pre-create the external service, switch upstream traffic, then upgrade the operator. The reconcile logic handles the ClusterIP-to-headless migration by detecting and recreating the service when needed. Co-Authored-By: catpineapple --- api/disaggregated/v1/unique_id.go | 4 + .../computegroups/controller.go | 45 +++- .../computegroups/service.go | 46 +++- .../computegroups/service_test.go | 208 ++++++++++++++++++ 4 files changed, 298 insertions(+), 5 deletions(-) create mode 100644 pkg/controller/sub_controller/disaggregated_cluster/computegroups/service_test.go diff --git a/api/disaggregated/v1/unique_id.go b/api/disaggregated/v1/unique_id.go index 06d49d9f..d6e53732 100644 --- a/api/disaggregated/v1/unique_id.go +++ b/api/disaggregated/v1/unique_id.go @@ -64,6 +64,10 @@ func (ddc *DorisDisaggregatedCluster) GetCGServiceName(cg *ComputeGroup) string return svcName } +func (ddc *DorisDisaggregatedCluster) GetCGExternalServiceName(cg *ComputeGroup) string { + return ddc.GetCGServiceName(cg) + "-external" +} + func (ddc *DorisDisaggregatedCluster) GetFEServiceName() string { return ddc.Name + "-" + "fe" } diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go index a2222c4c..e59538cb 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go @@ -146,17 +146,28 @@ func (dcgs *DisaggregatedComputeGroupsController) computeGroupSync(ctx context.C } cvs := dcgs.GetConfigValuesFromConfigMaps(ddc.Namespace, resource.BE_RESOLVEKEY, cg.CommonSpec.ConfigMaps) st := dcgs.NewStatefulset(ddc, cg, cvs) - svc := dcgs.newService(ddc, cg, cvs) + internalSvc := dcgs.newInternalService(ddc, cg, cvs) + externalSvc := dcgs.newExternalService(ddc, cg, cvs) dcgs.initialCGStatus(ddc, cg) dcgs.CheckSecretMountPath(ddc, cg.Secrets) dcgs.CheckSecretExist(ctx, ddc, cg.Secrets) - event, err := dcgs.DefaultReconcileService(ctx, svc) + // Reconcile internal headless service. + // During upgrade from older versions, the existing service may not be headless (has a ClusterIP assigned). + // Since K8s does not allow changing spec.clusterIP on an existing service, we must delete and recreate it. + if err := dcgs.reconcileInternalService(ctx, internalSvc); err != nil { + klog.Errorf("disaggregatedComputeGroupsController reconcile internal service namespace %s name %s failed, err=%s", internalSvc.Namespace, internalSvc.Name, err.Error()) + return &sc.Event{Type: sc.EventWarning, Reason: sc.ServiceApplyedFailed, Message: err.Error()}, err + } + + // Reconcile external service for load-balanced access. + event, err := dcgs.DefaultReconcileService(ctx, externalSvc) if err != nil { - klog.Errorf("disaggregatedComputeGroupsController reconcile service namespace %s name %s failed, err=%s", svc.Namespace, svc.Name, err.Error()) + klog.Errorf("disaggregatedComputeGroupsController reconcile external service namespace %s name %s failed, err=%s", externalSvc.Namespace, externalSvc.Name, err.Error()) return event, err } + event, err = dcgs.reconcileStatefulset(ctx, st, ddc, cg) if err != nil { klog.Errorf("disaggregatedComputeGroupsController reconcile statefulset namespace %s name %s failed, err=%s", st.Namespace, st.Name, err.Error()) @@ -171,6 +182,34 @@ func (dcgs *DisaggregatedComputeGroupsController) computeGroupSync(ctx context.C return event, err } +// reconcileInternalService reconciles the internal headless service for a compute group. +// If the existing service is not headless (upgrade scenario), it deletes and recreates it, +// because K8s does not allow mutating spec.clusterIP from a valid IP to "None". +func (dcgs *DisaggregatedComputeGroupsController) reconcileInternalService(ctx context.Context, svc *corev1.Service) error { + var existingSvc corev1.Service + err := dcgs.K8sclient.Get(ctx, types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}, &existingSvc) + if err != nil { + if apierrors.IsNotFound(err) { + return k8s.CreateClientObject(ctx, dcgs.K8sclient, svc) + } + return err + } + + // If the existing service is already headless, use normal reconcile path. + if existingSvc.Spec.ClusterIP == "None" { + _, err = dcgs.DefaultReconcileService(ctx, svc) + return err + } + + // Existing service is not headless — delete and recreate. + klog.Infof("reconcileInternalService existing service %s/%s is not headless (clusterIP=%s), deleting and recreating as headless.", + existingSvc.Namespace, existingSvc.Name, existingSvc.Spec.ClusterIP) + if err = k8s.DeleteService(ctx, dcgs.K8sclient, svc.Namespace, svc.Name); err != nil { + return err + } + return k8s.CreateClientObject(ctx, dcgs.K8sclient, svc) +} + // reconcileStatefulset return bool means reconcile print error message. func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx context.Context, st *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) (*sc.Event, error) { //use new default value before apply new statefulset, when creating and apply spec change. diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/service.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/service.go index 984eb0b7..dfe9964e 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/service.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/service.go @@ -21,17 +21,59 @@ import ( dv1 "github.com/apache/doris-operator/api/disaggregated/v1" "github.com/apache/doris-operator/pkg/common/utils/resource" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" ) -func (dcgs *DisaggregatedComputeGroupsController) newService(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup, cvs map[string]interface{}) *corev1.Service { +// newInternalService builds a headless service for internal cluster communication. +// This service is used as the StatefulSet's serviceName for DNS-based pod discovery. +func (dcgs *DisaggregatedComputeGroupsController) newInternalService(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup, cvs map[string]interface{}) *corev1.Service { + uniqueId := cg.UniqueId + labels := dcgs.newCG2LayerSchedulerLabels(ddc.Name, uniqueId) + labels[dv1.ServiceRoleForCluster] = string(dv1.Service_Role_Internal) + selector := dcgs.newCGPodsSelector(ddc.Name, uniqueId) + + heartbeatPort := resource.GetPort(cvs, resource.HEARTBEAT_SERVICE_PORT) + + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: ddc.GetCGServiceName(cg), + Namespace: ddc.Namespace, + Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: ddc.APIVersion, + Kind: ddc.Kind, + Name: ddc.Name, + UID: ddc.UID, + }, + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "None", + Ports: []corev1.ServicePort{ + { + Name: resource.GetPortKey(resource.HEARTBEAT_SERVICE_PORT), + Port: heartbeatPort, + TargetPort: intstr.FromInt32(heartbeatPort), + }, + }, + Selector: selector, + PublishNotReadyAddresses: true, + }, + } +} + +// newExternalService builds the external service for load-balanced access. +// User ExportService configuration (Type, Annotations, PortMaps) is applied to this service. +func (dcgs *DisaggregatedComputeGroupsController) newExternalService(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup, cvs map[string]interface{}) *corev1.Service { uniqueId := cg.UniqueId svcConf := cg.CommonSpec.Service sps := newComputeServicePorts(cvs, svcConf) svc := dcgs.NewDefaultService(ddc) ob := &svc.ObjectMeta - ob.Name = ddc.GetCGServiceName(cg) + ob.Name = ddc.GetCGExternalServiceName(cg) ob.Labels = dcgs.newCG2LayerSchedulerLabels(ddc.Name, uniqueId) spec := &svc.Spec diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/service_test.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/service_test.go new file mode 100644 index 00000000..3205bf5a --- /dev/null +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/service_test.go @@ -0,0 +1,208 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package computegroups + +import ( + "testing" + + dv1 "github.com/apache/doris-operator/api/disaggregated/v1" + "github.com/apache/doris-operator/pkg/common/utils/resource" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func newTestDDC() *dv1.DorisDisaggregatedCluster { + return &dv1.DorisDisaggregatedCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-ddc", + Namespace: "default", + UID: "test-uid", + }, + TypeMeta: metav1.TypeMeta{ + APIVersion: "disaggregated.cluster.doris.com/v1", + Kind: "DorisDisaggregatedCluster", + }, + } +} + +func newTestCG(uniqueId string) *dv1.ComputeGroup { + return &dv1.ComputeGroup{ + UniqueId: uniqueId, + } +} + +func newTestController() *DisaggregatedComputeGroupsController { + return &DisaggregatedComputeGroupsController{} +} + +func Test_GetCGExternalServiceName(t *testing.T) { + ddc := newTestDDC() + cg := newTestCG("cg1") + expected := "test-ddc-cg1-external" + actual := ddc.GetCGExternalServiceName(cg) + if actual != expected { + t.Errorf("GetCGExternalServiceName() = %s, want %s", actual, expected) + } +} + +func Test_GetCGExternalServiceName_WithUnderscore(t *testing.T) { + ddc := newTestDDC() + cg := newTestCG("cg_group_1") + expected := "test-ddc-cg-group-1-external" + actual := ddc.GetCGExternalServiceName(cg) + if actual != expected { + t.Errorf("GetCGExternalServiceName() = %s, want %s", actual, expected) + } +} + +func Test_newInternalService(t *testing.T) { + dcgs := newTestController() + ddc := newTestDDC() + cg := newTestCG("cg1") + cvs := map[string]interface{}{} + + svc := dcgs.newInternalService(ddc, cg, cvs) + + // Verify service name + expectedName := "test-ddc-cg1" + if svc.Name != expectedName { + t.Errorf("internal service name = %s, want %s", svc.Name, expectedName) + } + + // Verify headless (ClusterIP: None) + if svc.Spec.ClusterIP != "None" { + t.Errorf("internal service ClusterIP = %s, want None", svc.Spec.ClusterIP) + } + + // Verify publishNotReadyAddresses + if !svc.Spec.PublishNotReadyAddresses { + t.Error("internal service PublishNotReadyAddresses should be true") + } + + // Verify only heartbeat port is exposed + if len(svc.Spec.Ports) != 1 { + t.Fatalf("internal service should have 1 port, got %d", len(svc.Spec.Ports)) + } + if svc.Spec.Ports[0].Name != resource.GetPortKey(resource.HEARTBEAT_SERVICE_PORT) { + t.Errorf("internal service port name = %s, want %s", svc.Spec.Ports[0].Name, resource.GetPortKey(resource.HEARTBEAT_SERVICE_PORT)) + } + + // Verify internal service role label + if svc.Labels[dv1.ServiceRoleForCluster] != string(dv1.Service_Role_Internal) { + t.Errorf("internal service role label = %s, want %s", svc.Labels[dv1.ServiceRoleForCluster], string(dv1.Service_Role_Internal)) + } + + // Verify OwnerReference + if len(svc.OwnerReferences) != 1 || svc.OwnerReferences[0].Name != "test-ddc" { + t.Error("internal service should have correct OwnerReference") + } +} + +func Test_newExternalService(t *testing.T) { + dcgs := newTestController() + ddc := newTestDDC() + cg := newTestCG("cg1") + cvs := map[string]interface{}{} + + svc := dcgs.newExternalService(ddc, cg, cvs) + + // Verify service name + expectedName := "test-ddc-cg1-external" + if svc.Name != expectedName { + t.Errorf("external service name = %s, want %s", svc.Name, expectedName) + } + + // Verify NOT headless + if svc.Spec.ClusterIP == "None" { + t.Error("external service should not be headless") + } + + // Verify has all ports (be_port, webserver, heartbeat, brpc) + if len(svc.Spec.Ports) < 4 { + t.Errorf("external service should have at least 4 ports, got %d", len(svc.Spec.Ports)) + } + + // Verify OwnerReference + if len(svc.OwnerReferences) != 1 || svc.OwnerReferences[0].Name != "test-ddc" { + t.Error("external service should have correct OwnerReference") + } +} + +func Test_newExternalService_WithExportServiceConfig(t *testing.T) { + dcgs := newTestController() + ddc := newTestDDC() + cg := newTestCG("cg1") + cg.CommonSpec.Service = &dv1.ExportService{ + Type: corev1.ServiceTypeNodePort, + Annotations: map[string]string{"cloud.provider/lb": "true"}, + } + cvs := map[string]interface{}{} + + svc := dcgs.newExternalService(ddc, cg, cvs) + + // Verify service type from ExportService config + if svc.Spec.Type != corev1.ServiceTypeNodePort { + t.Errorf("external service type = %s, want NodePort", svc.Spec.Type) + } + + // Verify annotations from ExportService config + if svc.Annotations["cloud.provider/lb"] != "true" { + t.Error("external service should have annotations from ExportService config") + } +} + +func Test_newExternalService_LoadBalancer(t *testing.T) { + dcgs := newTestController() + ddc := newTestDDC() + cg := newTestCG("cg1") + cg.CommonSpec.Service = &dv1.ExportService{ + Type: corev1.ServiceTypeLoadBalancer, + } + cvs := map[string]interface{}{} + + svc := dcgs.newExternalService(ddc, cg, cvs) + + if svc.Spec.Type != corev1.ServiceTypeLoadBalancer { + t.Errorf("external service type = %s, want LoadBalancer", svc.Spec.Type) + } + + // Verify SessionAffinity is None for LoadBalancer + if svc.Spec.SessionAffinity != corev1.ServiceAffinityNone { + t.Errorf("external service SessionAffinity = %s, want None for LoadBalancer", svc.Spec.SessionAffinity) + } +} + +func Test_InternalAndExternalServiceSelectors(t *testing.T) { + dcgs := newTestController() + ddc := newTestDDC() + cg := newTestCG("cg1") + cvs := map[string]interface{}{} + + internalSvc := dcgs.newInternalService(ddc, cg, cvs) + externalSvc := dcgs.newExternalService(ddc, cg, cvs) + + // Both services should have the same selector so they route to the same pods + if len(internalSvc.Spec.Selector) != len(externalSvc.Spec.Selector) { + t.Fatal("internal and external services should have the same number of selector labels") + } + for k, v := range internalSvc.Spec.Selector { + if externalSvc.Spec.Selector[k] != v { + t.Errorf("selector mismatch for key %s: internal=%s, external=%s", k, v, externalSvc.Spec.Selector[k]) + } + } +}