Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/disaggregated/v1/unique_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func (ddc *DorisDisaggregatedCluster) GetCGServiceName(cg *ComputeGroup) string
return svcName
}

func (ddc *DorisDisaggregatedCluster) GetCGExternalServiceName(cg *ComputeGroup) string {
return ddc.GetCGServiceName(cg) + "-external"
}

func (ddc *DorisDisaggregatedCluster) GetFEServiceName() string {
return ddc.Name + "-" + "fe"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,28 @@ func (dcgs *DisaggregatedComputeGroupsController) computeGroupSync(ctx context.C
}
cvs := dcgs.GetConfigValuesFromConfigMaps(ddc.Namespace, resource.BE_RESOLVEKEY, cg.CommonSpec.ConfigMaps)
st := dcgs.NewStatefulset(ddc, cg, cvs)
svc := dcgs.newService(ddc, cg, cvs)
internalSvc := dcgs.newInternalService(ddc, cg, cvs)
externalSvc := dcgs.newExternalService(ddc, cg, cvs)
dcgs.initialCGStatus(ddc, cg)

dcgs.CheckSecretMountPath(ddc, cg.Secrets)
dcgs.CheckSecretExist(ctx, ddc, cg.Secrets)

event, err := dcgs.DefaultReconcileService(ctx, svc)
// Reconcile internal headless service.
// During upgrade from older versions, the existing service may not be headless (has a ClusterIP assigned).
// Since K8s does not allow changing spec.clusterIP on an existing service, we must delete and recreate it.
if err := dcgs.reconcileInternalService(ctx, internalSvc); err != nil {
klog.Errorf("disaggregatedComputeGroupsController reconcile internal service namespace %s name %s failed, err=%s", internalSvc.Namespace, internalSvc.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.ServiceApplyedFailed, Message: err.Error()}, err
}

// Reconcile external service for load-balanced access.
event, err := dcgs.DefaultReconcileService(ctx, externalSvc)
if err != nil {
klog.Errorf("disaggregatedComputeGroupsController reconcile service namespace %s name %s failed, err=%s", svc.Namespace, svc.Name, err.Error())
klog.Errorf("disaggregatedComputeGroupsController reconcile external service namespace %s name %s failed, err=%s", externalSvc.Namespace, externalSvc.Name, err.Error())
return event, err
}

event, err = dcgs.reconcileStatefulset(ctx, st, ddc, cg)
if err != nil {
klog.Errorf("disaggregatedComputeGroupsController reconcile statefulset namespace %s name %s failed, err=%s", st.Namespace, st.Name, err.Error())
Expand All @@ -171,6 +182,34 @@ func (dcgs *DisaggregatedComputeGroupsController) computeGroupSync(ctx context.C
return event, err
}

// reconcileInternalService reconciles the internal headless service for a compute group.
// If the existing service is not headless (upgrade scenario), it deletes and recreates it,
// because K8s does not allow mutating spec.clusterIP from a valid IP to "None".
func (dcgs *DisaggregatedComputeGroupsController) reconcileInternalService(ctx context.Context, svc *corev1.Service) error {
var existingSvc corev1.Service
err := dcgs.K8sclient.Get(ctx, types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}, &existingSvc)
if err != nil {
if apierrors.IsNotFound(err) {
return k8s.CreateClientObject(ctx, dcgs.K8sclient, svc)
}
return err
}

// If the existing service is already headless, use normal reconcile path.
if existingSvc.Spec.ClusterIP == "None" {
_, err = dcgs.DefaultReconcileService(ctx, svc)
return err
}

// Existing service is not headless — delete and recreate.
klog.Infof("reconcileInternalService existing service %s/%s is not headless (clusterIP=%s), deleting and recreating as headless.",
existingSvc.Namespace, existingSvc.Name, existingSvc.Spec.ClusterIP)
if err = k8s.DeleteService(ctx, dcgs.K8sclient, svc.Namespace, svc.Name); err != nil {
return err
}
return k8s.CreateClientObject(ctx, dcgs.K8sclient, svc)
}

// reconcileStatefulset return bool means reconcile print error message.
func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx context.Context, st *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) (*sc.Event, error) {
//use new default value before apply new statefulset, when creating and apply spec change.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,59 @@ import (
dv1 "github.com/apache/doris-operator/api/disaggregated/v1"
"github.com/apache/doris-operator/pkg/common/utils/resource"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

func (dcgs *DisaggregatedComputeGroupsController) newService(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup, cvs map[string]interface{}) *corev1.Service {
// newInternalService builds a headless service for internal cluster communication.
// This service is used as the StatefulSet's serviceName for DNS-based pod discovery.
func (dcgs *DisaggregatedComputeGroupsController) newInternalService(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup, cvs map[string]interface{}) *corev1.Service {
uniqueId := cg.UniqueId
labels := dcgs.newCG2LayerSchedulerLabels(ddc.Name, uniqueId)
labels[dv1.ServiceRoleForCluster] = string(dv1.Service_Role_Internal)
selector := dcgs.newCGPodsSelector(ddc.Name, uniqueId)

heartbeatPort := resource.GetPort(cvs, resource.HEARTBEAT_SERVICE_PORT)

return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: ddc.GetCGServiceName(cg),
Namespace: ddc.Namespace,
Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: ddc.APIVersion,
Kind: ddc.Kind,
Name: ddc.Name,
UID: ddc.UID,
},
},
},
Spec: corev1.ServiceSpec{
ClusterIP: "None",
Ports: []corev1.ServicePort{
{
Name: resource.GetPortKey(resource.HEARTBEAT_SERVICE_PORT),
Port: heartbeatPort,
TargetPort: intstr.FromInt32(heartbeatPort),
},
},
Selector: selector,
PublishNotReadyAddresses: true,
},
}
}

// newExternalService builds the external service for load-balanced access.
// User ExportService configuration (Type, Annotations, PortMaps) is applied to this service.
func (dcgs *DisaggregatedComputeGroupsController) newExternalService(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup, cvs map[string]interface{}) *corev1.Service {
uniqueId := cg.UniqueId
svcConf := cg.CommonSpec.Service
sps := newComputeServicePorts(cvs, svcConf)
svc := dcgs.NewDefaultService(ddc)

ob := &svc.ObjectMeta
ob.Name = ddc.GetCGServiceName(cg)
ob.Name = ddc.GetCGExternalServiceName(cg)
ob.Labels = dcgs.newCG2LayerSchedulerLabels(ddc.Name, uniqueId)

spec := &svc.Spec
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package computegroups

import (
"testing"

dv1 "github.com/apache/doris-operator/api/disaggregated/v1"
"github.com/apache/doris-operator/pkg/common/utils/resource"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func newTestDDC() *dv1.DorisDisaggregatedCluster {
return &dv1.DorisDisaggregatedCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test-ddc",
Namespace: "default",
UID: "test-uid",
},
TypeMeta: metav1.TypeMeta{
APIVersion: "disaggregated.cluster.doris.com/v1",
Kind: "DorisDisaggregatedCluster",
},
}
}

func newTestCG(uniqueId string) *dv1.ComputeGroup {
return &dv1.ComputeGroup{
UniqueId: uniqueId,
}
}

func newTestController() *DisaggregatedComputeGroupsController {
return &DisaggregatedComputeGroupsController{}
}

func Test_GetCGExternalServiceName(t *testing.T) {
ddc := newTestDDC()
cg := newTestCG("cg1")
expected := "test-ddc-cg1-external"
actual := ddc.GetCGExternalServiceName(cg)
if actual != expected {
t.Errorf("GetCGExternalServiceName() = %s, want %s", actual, expected)
}
}

func Test_GetCGExternalServiceName_WithUnderscore(t *testing.T) {
ddc := newTestDDC()
cg := newTestCG("cg_group_1")
expected := "test-ddc-cg-group-1-external"
actual := ddc.GetCGExternalServiceName(cg)
if actual != expected {
t.Errorf("GetCGExternalServiceName() = %s, want %s", actual, expected)
}
}

func Test_newInternalService(t *testing.T) {
dcgs := newTestController()
ddc := newTestDDC()
cg := newTestCG("cg1")
cvs := map[string]interface{}{}

svc := dcgs.newInternalService(ddc, cg, cvs)

// Verify service name
expectedName := "test-ddc-cg1"
if svc.Name != expectedName {
t.Errorf("internal service name = %s, want %s", svc.Name, expectedName)
}

// Verify headless (ClusterIP: None)
if svc.Spec.ClusterIP != "None" {
t.Errorf("internal service ClusterIP = %s, want None", svc.Spec.ClusterIP)
}

// Verify publishNotReadyAddresses
if !svc.Spec.PublishNotReadyAddresses {
t.Error("internal service PublishNotReadyAddresses should be true")
}

// Verify only heartbeat port is exposed
if len(svc.Spec.Ports) != 1 {
t.Fatalf("internal service should have 1 port, got %d", len(svc.Spec.Ports))
}
if svc.Spec.Ports[0].Name != resource.GetPortKey(resource.HEARTBEAT_SERVICE_PORT) {
t.Errorf("internal service port name = %s, want %s", svc.Spec.Ports[0].Name, resource.GetPortKey(resource.HEARTBEAT_SERVICE_PORT))
}

// Verify internal service role label
if svc.Labels[dv1.ServiceRoleForCluster] != string(dv1.Service_Role_Internal) {
t.Errorf("internal service role label = %s, want %s", svc.Labels[dv1.ServiceRoleForCluster], string(dv1.Service_Role_Internal))
}

// Verify OwnerReference
if len(svc.OwnerReferences) != 1 || svc.OwnerReferences[0].Name != "test-ddc" {
t.Error("internal service should have correct OwnerReference")
}
}

func Test_newExternalService(t *testing.T) {
dcgs := newTestController()
ddc := newTestDDC()
cg := newTestCG("cg1")
cvs := map[string]interface{}{}

svc := dcgs.newExternalService(ddc, cg, cvs)

// Verify service name
expectedName := "test-ddc-cg1-external"
if svc.Name != expectedName {
t.Errorf("external service name = %s, want %s", svc.Name, expectedName)
}

// Verify NOT headless
if svc.Spec.ClusterIP == "None" {
t.Error("external service should not be headless")
}

// Verify has all ports (be_port, webserver, heartbeat, brpc)
if len(svc.Spec.Ports) < 4 {
t.Errorf("external service should have at least 4 ports, got %d", len(svc.Spec.Ports))
}

// Verify OwnerReference
if len(svc.OwnerReferences) != 1 || svc.OwnerReferences[0].Name != "test-ddc" {
t.Error("external service should have correct OwnerReference")
}
}

func Test_newExternalService_WithExportServiceConfig(t *testing.T) {
dcgs := newTestController()
ddc := newTestDDC()
cg := newTestCG("cg1")
cg.CommonSpec.Service = &dv1.ExportService{
Type: corev1.ServiceTypeNodePort,
Annotations: map[string]string{"cloud.provider/lb": "true"},
}
cvs := map[string]interface{}{}

svc := dcgs.newExternalService(ddc, cg, cvs)

// Verify service type from ExportService config
if svc.Spec.Type != corev1.ServiceTypeNodePort {
t.Errorf("external service type = %s, want NodePort", svc.Spec.Type)
}

// Verify annotations from ExportService config
if svc.Annotations["cloud.provider/lb"] != "true" {
t.Error("external service should have annotations from ExportService config")
}
}

func Test_newExternalService_LoadBalancer(t *testing.T) {
dcgs := newTestController()
ddc := newTestDDC()
cg := newTestCG("cg1")
cg.CommonSpec.Service = &dv1.ExportService{
Type: corev1.ServiceTypeLoadBalancer,
}
cvs := map[string]interface{}{}

svc := dcgs.newExternalService(ddc, cg, cvs)

if svc.Spec.Type != corev1.ServiceTypeLoadBalancer {
t.Errorf("external service type = %s, want LoadBalancer", svc.Spec.Type)
}

// Verify SessionAffinity is None for LoadBalancer
if svc.Spec.SessionAffinity != corev1.ServiceAffinityNone {
t.Errorf("external service SessionAffinity = %s, want None for LoadBalancer", svc.Spec.SessionAffinity)
}
}

func Test_InternalAndExternalServiceSelectors(t *testing.T) {
dcgs := newTestController()
ddc := newTestDDC()
cg := newTestCG("cg1")
cvs := map[string]interface{}{}

internalSvc := dcgs.newInternalService(ddc, cg, cvs)
externalSvc := dcgs.newExternalService(ddc, cg, cvs)

// Both services should have the same selector so they route to the same pods
if len(internalSvc.Spec.Selector) != len(externalSvc.Spec.Selector) {
t.Fatal("internal and external services should have the same number of selector labels")
}
for k, v := range internalSvc.Spec.Selector {
if externalSvc.Spec.Selector[k] != v {
t.Errorf("selector mismatch for key %s: internal=%s, external=%s", k, v, externalSvc.Spec.Selector[k])
}
}
}
Loading